-
Notifications
You must be signed in to change notification settings - Fork 4.4k
BEAM-1438 Auto shard streaming sinks #1952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
R: @tgroh |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests?
@tgroh suggestions on where test should go? I don't see where the tests for similar changes (e.g. replacing PubSub source/sink) are. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not unreasonable to make the StreamingShardedWrite package-private and have a test for it.
Also, after the submission of #2039, the way this is applied should change - you'll have to insert this override at the front of the overrides map (I believe), and should use the PTransformMatchers.writeWithRunnerDeterminedSharding()
matcher
@reuvenlax any updates here? |
@reuvenlax, any updates perhaps? |
0038d59
to
689875c
Compare
689875c
to
ff6bb35
Compare
6e50172
to
4a605fd
Compare
|
WRT PipelineOptions at construction time, at the point a PipelineRunner is executing (including in |
4a605fd
to
03e9fbf
Compare
3e2bbb2
to
36fceb5
Compare
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
|
||
assertThat( | ||
factory.getReplacementTransform(originalApplication).getTransform(), | ||
not(equalTo((Object) original))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verify that the replacement transform has the expected number of shards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) { | ||
return PTransformReplacement.of( | ||
PTransformReplacements.getSingletonMainInput(transform), | ||
transform.getTransform().withNumShards(options.getMaxNumWorkers() * 2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment here explaining why this is a good choice, and what effect it will have? (how many output files we'll actually get, and how many we would have gotten without this override)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
retest this please |
@@ -1296,6 +1302,46 @@ private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) { | |||
} | |||
|
|||
@VisibleForTesting | |||
static class StreamingShardedWriteFactory<T> | |||
implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> { | |||
static final int DEFAULT_NUM_SHARDS = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on the reasoning here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
WriteFiles<Object> replacement = (WriteFiles<Object>) | ||
factory.getReplacementTransform(originalApplication).getTransform(); | ||
assertThat(replacement, not(equalTo((Object) original))); | ||
assertThat(replacement.getNumShards().get(), equalTo(20)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we test the default with unset "(max)NumWorkers"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Otherwise, LGTM |
retest this please. |
retest this please. |
retest this please |
Hey @reuvenlax any updates on this PR? |
…taflow runner default to maxNumWorkers * 2 shards.
0c5cd10
to
272a18a
Compare
@tgroh can you help figure out this test error? It's complaining that the Create hidden inside of WriteFiles has not bee overridden. |
@@ -337,6 +340,10 @@ public static DataflowRunner fromOptions(PipelineOptions options) { | |||
new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this))) | |||
.add( | |||
PTransformOverride.of( | |||
PTransformMatchers.writeWithRunnerDeterminedSharding(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the test to pass, you need to move this override to occur before the Read.Bounded
override
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
retest this please. |
retest this please |
If a Write requests runner-determined sharding, per-bundle sharding is the default but performs poorly in Dataflow's streaming runner. Instead, the runner statically picks a sharding based on the number of workers.
This PR accidentally got closed. Reopening.
R: @jkff