Skip to content

Commit 4a605fd

Browse files
committed
Rollback extraneous cahnge and fix tests.
1 parent e671904 commit 4a605fd

File tree

3 files changed

+20
-87
lines changed

3 files changed

+20
-87
lines changed

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.apache.beam.sdk.io.FileSystems;
9090
import org.apache.beam.sdk.io.Read;
9191
import org.apache.beam.sdk.io.UnboundedSource;
92+
import org.apache.beam.sdk.io.WriteFiles;
9293
import org.apache.beam.sdk.io.fs.ResourceId;
9394
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
9495
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -1302,7 +1303,7 @@ public Map<PValue, ReplacementOutput> mapOutputs(
13021303

13031304
@VisibleForTesting
13041305
static class StreamingShardedWriteFactory<T>
1305-
implements PTransformOverrideFactory<PCollection<T>, PDone, Write<T>> {
1306+
implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> {
13061307
DataflowPipelineWorkerPoolOptions options;
13071308

13081309
StreamingShardedWriteFactory(PipelineOptions options) {
@@ -1311,14 +1312,15 @@ static class StreamingShardedWriteFactory<T>
13111312

13121313
@Override
13131314
public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
1314-
AppliedPTransform<PCollection<T>, PDone, Write<T>> transform) {
1315+
AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) {
13151316
return PTransformReplacement.of(
13161317
PTransformReplacements.getSingletonMainInput(transform),
13171318
transform.getTransform().withNumShards(options.getMaxNumWorkers() * 2));
13181319
}
13191320

13201321
@Override
1321-
public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PDone newOutput) {
1322+
public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs,
1323+
PDone newOutput) {
13221324
return Collections.emptyMap();
13231325
}
13241326
}

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java

Lines changed: 15 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818
package org.apache.beam.runners.dataflow;
1919

2020
import static org.apache.beam.runners.dataflow.DataflowRunner.getContainerImageForJob;
21-
import static org.hamcrest.Matchers.allOf;
2221
import static org.hamcrest.Matchers.both;
23-
import static org.hamcrest.Matchers.containsInAnyOrder;
2422
import static org.hamcrest.Matchers.containsString;
2523
import static org.hamcrest.Matchers.equalTo;
26-
import static org.hamcrest.Matchers.greaterThan;
2724
import static org.hamcrest.Matchers.hasItem;
2825
import static org.hamcrest.Matchers.is;
26+
import static org.hamcrest.Matchers.not;
2927
import static org.hamcrest.Matchers.startsWith;
3028
import static org.junit.Assert.assertEquals;
3129
import static org.junit.Assert.assertFalse;
@@ -52,17 +50,14 @@
5250
import java.io.File;
5351
import java.io.FileNotFoundException;
5452
import java.io.IOException;
55-
import java.io.Reader;
5653
import java.net.URL;
5754
import java.net.URLClassLoader;
58-
import java.nio.CharBuffer;
5955
import java.nio.channels.FileChannel;
6056
import java.nio.channels.SeekableByteChannel;
6157
import java.nio.file.Files;
6258
import java.nio.file.StandardOpenOption;
6359
import java.util.ArrayList;
6460
import java.util.Arrays;
65-
import java.util.Collection;
6661
import java.util.Collections;
6762
import java.util.LinkedList;
6863
import java.util.List;
@@ -71,26 +66,27 @@
7166
import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
7267
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
7368
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
74-
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
7569
import org.apache.beam.sdk.Pipeline;
7670
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
7771
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
7872
import org.apache.beam.sdk.coders.Coder;
73+
import org.apache.beam.sdk.coders.VoidCoder;
7974
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
8075
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
76+
import org.apache.beam.sdk.io.FileBasedSink;
8177
import org.apache.beam.sdk.io.FileSystems;
8278
import org.apache.beam.sdk.io.TextIO;
83-
import org.apache.beam.sdk.io.Write;
79+
import org.apache.beam.sdk.io.WriteFiles;
8480
import org.apache.beam.sdk.options.PipelineOptions;
8581
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
8682
import org.apache.beam.sdk.options.PipelineOptionsFactory;
8783
import org.apache.beam.sdk.options.ValueProvider;
8884
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
85+
import org.apache.beam.sdk.runners.AppliedPTransform;
8986
import org.apache.beam.sdk.runners.TransformHierarchy;
9087
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
9188
import org.apache.beam.sdk.testing.ExpectedLogs;
9289
import org.apache.beam.sdk.testing.TestPipeline;
93-
import org.apache.beam.sdk.transforms.AppliedPTransform;
9490
import org.apache.beam.sdk.transforms.Create;
9591
import org.apache.beam.sdk.transforms.PTransform;
9692
import org.apache.beam.sdk.util.GcsUtil;
@@ -101,6 +97,7 @@
10197
import org.apache.beam.sdk.values.PDone;
10298
import org.apache.beam.sdk.values.PValue;
10399
import org.apache.beam.sdk.values.TimestampedValue;
100+
import org.apache.beam.sdk.values.TupleTag;
104101
import org.apache.beam.sdk.values.WindowingStrategy;
105102
import org.hamcrest.Description;
106103
import org.hamcrest.Matchers;
@@ -1130,59 +1127,31 @@ public void testWorkerHarnessContainerImage() {
11301127
assertThat(getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
11311128
}
11321129

1133-
static class SimpleFilenamePolicy extends FilenamePolicy {
1134-
String baseLocation;
1135-
1136-
SimpleFilenamePolicy(String baseLocation) {
1137-
this.baseLocation = baseLocation;
1138-
}
1139-
1140-
@Override
1141-
public String windowedFilename(WindowedContext c) {
1142-
return baseLocation + "-shard-" + c.getShardNumber();
1143-
}
1144-
1145-
@Override
1146-
public String unwindowedFilename(Context c) {
1147-
throw new UnsupportedOperationException("Unsupported");
1148-
}
1149-
1150-
@Override
1151-
public ValueProvider<String> getBaseOutputFilenameProvider() {
1152-
return StaticValueProvider.of(baseLocation);
1153-
}
1154-
};
1155-
1156-
PipelineOptions getOptions() {
1157-
PipelineOptions options = TestPipeline.testingPipelineOptions();
1158-
options.setRunner(DataflowRunner.class);
1159-
options.as(DataflowPipelineOptions.class).setStreaming(true);
1160-
options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10);
1161-
return options;
1162-
}
1163-
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(getOptions());
1164-
11651130
@Test
11661131
public void testStreamingWriteWithNoShardingReturnsNewTransform() {
11671132
TestPipeline p = TestPipeline.create();
11681133
StreamingShardedWriteFactory<Object> factory = new StreamingShardedWriteFactory(p.getOptions());
1169-
Write<Object> original = Write.to(new TestSink());
1134+
WriteFiles<Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
11701135
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
1171-
AppliedPTransform<PCollection<Object>, PDone, Write<Object>> originalApplication =
1136+
AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
11721137
AppliedPTransform.of(
1173-
"write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
1138+
"writefiles", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
11741139

11751140
assertThat(
11761141
factory.getReplacementTransform(originalApplication).getTransform(),
11771142
not(equalTo((Object) original)));
11781143
}
11791144

1180-
private static class TestSink extends Sink<Object> {
1145+
private static class TestSink extends FileBasedSink<Object> {
11811146
@Override
11821147
public void validate(PipelineOptions options) {}
11831148

1149+
TestSink(String tmpFolder) {
1150+
super(StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
1151+
null);
1152+
}
11841153
@Override
1185-
public WriteOperation<Object, ?> createWriteOperation(PipelineOptions options) {
1154+
public FileBasedWriteOperation<Object> createWriteOperation() {
11861155
throw new IllegalArgumentException("Should not be used");
11871156
}
11881157
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -154,43 +154,6 @@ private static void populateCommonDisplayData(DisplayData.Builder builder,
154154
}
155155
}
156156

157-
/**
158-
* Class representing a Pub/Sub message. Each message contains a single message payload and
159-
* a map of attached attributes.
160-
*/
161-
public static class PubsubMessage {
162-
private byte[] message;
163-
private Map<String, String> attributes;
164-
165-
public PubsubMessage(byte[] message, Map<String, String> attributes) {
166-
this.message = message;
167-
this.attributes = attributes;
168-
}
169-
170-
/**
171-
* Returns the main PubSub message.
172-
*/
173-
public byte[] getMessage() {
174-
return message;
175-
}
176-
177-
/**
178-
* Returns the given attribute value. If not such attribute exists, returns null.
179-
*/
180-
@Nullable
181-
public String getAttribute(String attribute) {
182-
checkNotNull(attribute, "attribute");
183-
return attributes.get(attribute);
184-
}
185-
186-
/**
187-
* Returns the full map of attributes. This is an unmodifiable map.
188-
*/
189-
public Map<String, String> getAttributeMap() {
190-
return attributes;
191-
}
192-
}
193-
194157
/**
195158
* Class representing a Cloud Pub/Sub Subscription.
196159
*/
@@ -890,7 +853,6 @@ public PDone expand(PCollection<T> input) {
890853
getTimestampAttribute(),
891854
getIdAttribute(),
892855
100 /* numShards */));
893-
894856
}
895857
throw new RuntimeException(); // cases are exhaustive.
896858
}

0 commit comments

Comments
 (0)