Skip to content

Commit 272a18a

Browse files
committed
Address comments.
1 parent b3e0cc7 commit 272a18a

File tree

2 files changed

+13
-1
lines changed

2 files changed

+13
-1
lines changed

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,6 +1437,8 @@ public Map<PValue, ReplacementOutput> mapOutputs(
14371437
@VisibleForTesting
14381438
static class StreamingShardedWriteFactory<T>
14391439
implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> {
1440+
// We pick 10 as a a default, as it works well with the default number of workers started
1441+
// by Dataflow.
14401442
static final int DEFAULT_NUM_SHARDS = 10;
14411443
DataflowPipelineWorkerPoolOptions options;
14421444

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1142,6 +1142,16 @@ public void testWorkerHarnessContainerImage() {
11421142
public void testStreamingWriteWithNoShardingReturnsNewTransform() {
11431143
PipelineOptions options = TestPipeline.testingPipelineOptions();
11441144
options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10);
1145+
testStreamingWriteOverride(options, 20);
1146+
}
1147+
1148+
@Test
1149+
public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() {
1150+
PipelineOptions options = TestPipeline.testingPipelineOptions();
1151+
testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS);
1152+
}
1153+
1154+
private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
11451155
TestPipeline p = TestPipeline.fromOptions(options);
11461156

11471157
StreamingShardedWriteFactory<Object> factory =
@@ -1155,7 +1165,7 @@ public void testStreamingWriteWithNoShardingReturnsNewTransform() {
11551165
WriteFiles<Object> replacement = (WriteFiles<Object>)
11561166
factory.getReplacementTransform(originalApplication).getTransform();
11571167
assertThat(replacement, not(equalTo((Object) original)));
1158-
assertThat(replacement.getNumShards().get(), equalTo(20));
1168+
assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
11591169
}
11601170

11611171
private static class TestSink extends FileBasedSink<Object> {

0 commit comments

Comments
 (0)