Skip to content

Commit d37c99b

Browse files
committed
Adds tests for the watermark hold (previously untested)
1 parent 71f7afa commit d37c99b

File tree

2 files changed

+62
-0
lines changed

2 files changed

+62
-0
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ public K getKey() {
7272
return key;
7373
}
7474

75+
@Nullable
76+
public Instant getWatermarkHold() {
77+
for (State state : this.inMemoryState.values()) {
78+
if (state instanceof WatermarkHoldState) {
79+
return ((WatermarkHoldState<?>) state).read();
80+
}
81+
}
82+
return null;
83+
}
84+
7585
/**
7686
* Interface common to all in-memory state cells. Includes ability to see whether a cell has been
7787
* cleared and the ability to create a clone of the contents.

runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
2121
import static org.hamcrest.Matchers.hasItem;
22+
import static org.hamcrest.Matchers.hasItems;
2223
import static org.hamcrest.Matchers.not;
2324
import static org.junit.Assert.assertEquals;
2425
import static org.junit.Assert.assertFalse;
@@ -36,6 +37,7 @@
3637
import org.apache.beam.sdk.Pipeline;
3738
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
3839
import org.apache.beam.sdk.coders.Coder;
40+
import org.apache.beam.sdk.coders.InstantCoder;
3941
import org.apache.beam.sdk.coders.SerializableCoder;
4042
import org.apache.beam.sdk.testing.TestPipeline;
4143
import org.apache.beam.sdk.transforms.Create;
@@ -334,6 +336,9 @@ List<OutputT> takeOutputElements() {
334336
return tester.takeOutputElements();
335337
}
336338

339+
public Instant getWatermarkHold() {
340+
return stateInternals.getWatermarkHold();
341+
}
337342
}
338343

339344
private static class OutputWindowedValueToDoFnTester<OutputT>
@@ -429,6 +434,53 @@ public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Ex
429434
}
430435
}
431436

437+
private static class WatermarkUpdateFn extends DoFn<Instant, String> {
438+
@ProcessElement
439+
public void process(ProcessContext c, OffsetRangeTracker tracker) {
440+
for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
441+
c.updateWatermark(c.element().plus(Duration.standardSeconds(i)));
442+
c.output(String.valueOf(i));
443+
}
444+
}
445+
446+
@GetInitialRestriction
447+
public OffsetRange getInitialRestriction(Instant elem) {
448+
throw new IllegalStateException("Expected to be supplied explicitly in this test");
449+
}
450+
451+
@NewTracker
452+
public OffsetRangeTracker newTracker(OffsetRange range) {
453+
return new OffsetRangeTracker(range);
454+
}
455+
}
456+
457+
@Test
458+
public void testUpdatesWatermark() throws Exception {
459+
DoFn<Instant, String> fn = new WatermarkUpdateFn();
460+
Instant base = Instant.now();
461+
462+
ProcessFnTester<Instant, String, OffsetRange, OffsetRangeTracker> tester =
463+
new ProcessFnTester<>(
464+
base,
465+
fn,
466+
InstantCoder.of(),
467+
SerializableCoder.of(OffsetRange.class),
468+
3,
469+
MAX_BUNDLE_DURATION);
470+
471+
tester.startElement(base, new OffsetRange(0, 8));
472+
assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
473+
assertEquals(base.plus(Duration.standardSeconds(2)), tester.getWatermarkHold());
474+
475+
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
476+
assertThat(tester.takeOutputElements(), hasItems("3", "4", "5"));
477+
assertEquals(base.plus(Duration.standardSeconds(5)), tester.getWatermarkHold());
478+
479+
assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
480+
assertThat(tester.takeOutputElements(), hasItems("6", "7"));
481+
assertEquals(null, tester.getWatermarkHold());
482+
}
483+
432484
/**
433485
* A splittable {@link DoFn} that generates the sequence [init, init + total).
434486
*/

0 commit comments

Comments
 (0)