Skip to content

BEAM-1438 Auto shard streaming sinks #1952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

reuvenlax
Copy link
Contributor

@reuvenlax reuvenlax commented Feb 8, 2017

If a Write requests runner-determined sharding, per-bundle sharding is the default but performs poorly in Dataflow's streaming runner. Instead, the runner statically picks a sharding based on the number of workers.

This PR accidentally got closed. Reopening.

R: @jkff

@asfbot
Copy link

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7207/
--none--

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 69.585% when pulling 0038d59 on reuvenlax:streaming_auto_shard_write into a25855a on apache:master.

@asfbot
Copy link

asfbot commented Feb 8, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/7208/
--none--

@dhalperi
Copy link
Contributor

R: @tgroh

Copy link
Member

@tgroh tgroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests?

@reuvenlax
Copy link
Contributor Author

@tgroh suggestions on where test should go? I don't see where the tests for similar changes (e.g. replacing PubSub source/sink) are.

Copy link
Member

@tgroh tgroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not unreasonable to make the StreamingShardedWrite package-private and have a test for it.

Also, after the submission of #2039, the way this is applied should change - you'll have to insert this override at the front of the overrides map (I believe), and should use the PTransformMatchers.writeWithRunnerDeterminedSharding() matcher

@aaltay
Copy link
Member

aaltay commented Mar 16, 2017

@reuvenlax any updates here?

@davorbonaci
Copy link
Member

@reuvenlax, any updates perhaps?

@reuvenlax reuvenlax force-pushed the streaming_auto_shard_write branch from 0038d59 to 689875c Compare April 20, 2017 19:49
@reuvenlax reuvenlax changed the title BEAM-1438 Auto shard streaming sinks BEAM-1438 Allow dynamic sharding when doing windowed file-based writes May 4, 2017
@reuvenlax reuvenlax changed the title BEAM-1438 Allow dynamic sharding when doing windowed file-based writes BEAM-1438 Auto shard streaming sinks May 4, 2017
@reuvenlax reuvenlax closed this May 4, 2017
@reuvenlax reuvenlax force-pushed the streaming_auto_shard_write branch from 689875c to ff6bb35 Compare May 4, 2017 12:25
@reuvenlax reuvenlax reopened this May 10, 2017
@reuvenlax reuvenlax force-pushed the streaming_auto_shard_write branch from 6e50172 to 4a605fd Compare May 10, 2017 05:38
@coveralls
Copy link

Coverage Status

Coverage increased (+0.007%) to 70.491% when pulling 4a605fd on reuvenlax:streaming_auto_shard_write into 8312b6f on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.02%) to 70.5% when pulling 4a605fd on reuvenlax:streaming_auto_shard_write into 8312b6f on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.07%) to 70.553% when pulling 4a605fd on reuvenlax:streaming_auto_shard_write into 8312b6f on apache:master.

@jkff
Copy link
Contributor

jkff commented May 10, 2017

  • There's still that PubsubIO commit. Please rebase -i github/master and delete the commit; if that doesn't work, do something like: git checkout github/master; git cherry-pick $commit (for every useful commit in this PR); git checkout -B streaming_auto_shard_write
  • There's a lot of formatting changes (the bulk of this PRs diff is formatting changes), roll them back?
  • Should this apply also to other streaming runners - Spark, Flink, Apex?
  • I think we're trying to minimize use of PipelineOptions at construction time. Would it make sense to access options.getMaxNumWorkers() at runtime instead, via a do-once ParDo or something?

@tgroh
Copy link
Member

tgroh commented May 10, 2017

WRT PipelineOptions at construction time, at the point a PipelineRunner is executing (including in PipelineRunner#run methods), options are available, so that change is fine.

@reuvenlax reuvenlax force-pushed the streaming_auto_shard_write branch from 4a605fd to 03e9fbf Compare May 11, 2017 07:12
@coveralls
Copy link

Coverage Status

Coverage increased (+0.007%) to 70.195% when pulling 03e9fbf on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

@reuvenlax reuvenlax force-pushed the streaming_auto_shard_write branch from 3e2bbb2 to 36fceb5 Compare May 11, 2017 08:35
@reuvenlax
Copy link
Contributor Author

  • PubSubIO commit is dropped
  • Rolled back unrelated formatting changes.
  • I don't want to necessarily apply this to other streaming runners, as I'm not 100% whether it's necessary there. I would prefer experts in those runner decide whether this makes sense, and apply this to those runner if necessary.
  • tgroh already addressed this. We are already inside PipelineRunner here.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.01%) to 70.174% when pulling 36fceb5 on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-7.0e-05%) to 70.188% when pulling 36fceb5 on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.003%) to 70.192% when pulling 36fceb5 on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.004%) to 70.185% when pulling 36fceb5 on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


assertThat(
factory.getReplacementTransform(originalApplication).getTransform(),
not(equalTo((Object) original)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify that the replacement transform has the expected number of shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
transform.getTransform().withNumShards(options.getMaxNumWorkers() * 2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here explaining why this is a good choice, and what effect it will have? (how many output files we'll actually get, and how many we would have gotten without this override)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@coveralls
Copy link

Coverage Status

Coverage increased (+0.02%) to 70.212% when pulling 92b31cb on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

@tgroh
Copy link
Member

tgroh commented May 17, 2017

retest this please

@@ -1296,6 +1302,46 @@ private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) {
}

@VisibleForTesting
static class StreamingShardedWriteFactory<T>
implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> {
static final int DEFAULT_NUM_SHARDS = 10;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on the reasoning here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

WriteFiles<Object> replacement = (WriteFiles<Object>)
factory.getReplacementTransform(originalApplication).getTransform();
assertThat(replacement, not(equalTo((Object) original)));
assertThat(replacement.getNumShards().get(), equalTo(20));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test the default with unset "(max)NumWorkers"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@tgroh
Copy link
Member

tgroh commented May 17, 2017

Otherwise, LGTM

@coveralls
Copy link

Coverage Status

Coverage increased (+0.5%) to 70.736% when pulling 0c5cd10 on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.6%) to 70.757% when pulling 0c5cd10 on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

@reuvenlax
Copy link
Contributor Author

retest this please.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.6%) to 70.753% when pulling 0c5cd10 on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

@reuvenlax
Copy link
Contributor Author

retest this please.

@reuvenlax
Copy link
Contributor Author

retest this please

@coveralls
Copy link

Coverage Status

Coverage increased (+0.5%) to 70.656% when pulling 0c5cd10 on reuvenlax:streaming_auto_shard_write into 15bd3a3 on apache:master.

@jkff
Copy link
Contributor

jkff commented May 24, 2017

@aaltay
Copy link
Member

aaltay commented Jun 8, 2017

Hey @reuvenlax any updates on this PR?

@reuvenlax reuvenlax force-pushed the streaming_auto_shard_write branch from 0c5cd10 to 272a18a Compare June 18, 2017 06:18
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.002%) to 70.693% when pulling 272a18a on reuvenlax:streaming_auto_shard_write into 0cabdf6 on apache:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.01%) to 70.684% when pulling 4f6bace on reuvenlax:streaming_auto_shard_write into 0cabdf6 on apache:master.

@reuvenlax
Copy link
Contributor Author

@tgroh can you help figure out this test error? It's complaining that the Create hidden inside of WriteFiles has not bee overridden.

@@ -337,6 +340,10 @@ public static DataflowRunner fromOptions(PipelineOptions options) {
new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this)))
.add(
PTransformOverride.of(
PTransformMatchers.writeWithRunnerDeterminedSharding(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the test to pass, you need to move this override to occur before the Read.Bounded override

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@reuvenlax
Copy link
Contributor Author

retest this please.

@jkff
Copy link
Contributor

jkff commented Jun 20, 2017

retest this please

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.01%) to 70.684% when pulling 9b9feb1 on reuvenlax:streaming_auto_shard_write into 0cabdf6 on apache:master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants