Repository: beam Updated Branches: refs/heads/master 0c24286e1 -> a26fd1ff3
Add some more RunnableOnService tests for stateful ParDo Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c00e912 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c00e912 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c00e912 Branch: refs/heads/master Commit: 6c00e9121e6572fc06d0379802883c118acbed9f Parents: f4e1097 Author: Kenneth Knowles <k...@google.com> Authored: Fri Jan 6 12:03:11 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Feb 6 15:21:43 2017 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/ParDoTest.java | 138 +++++++++++++++++++ 1 file changed, 138 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6c00e912/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 2e3fb85..7381e06 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -80,6 +80,7 @@ import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; @@ -1502,6 +1503,104 @@ public class ParDoTest implements Serializable { @Test @Category({RunnableOnService.class, UsesStatefulParDo.class}) + public void testValueStateFixedWindows() { + final String stateId = "foo"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @StateId(stateId) + private final StateSpec<Object, ValueState<Integer>> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + c.output(currentValue); + state.write(currentValue + 1); + } + }; + + IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); + + PCollection<Integer> output = + pipeline + .apply( + Create.timestamped( + // first window + TimestampedValue.of(KV.of("hello", 7), new Instant(1)), + TimestampedValue.of(KV.of("hello", 14), new Instant(2)), + TimestampedValue.of(KV.of("hello", 21), new Instant(3)), + + // second window + TimestampedValue.of(KV.of("hello", 28), new Instant(11)), + TimestampedValue.of(KV.of("hello", 35), new Instant(13)))) + .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(10)))) + .apply("Stateful ParDo", ParDo.of(fn)); + + PAssert.that(output).inWindow(firstWindow).containsInAnyOrder(0, 1, 2); + PAssert.that(output).inWindow(secondWindow).containsInAnyOrder(0, 1); + pipeline.run(); + } + + /** + * Tests that there is no state bleeding between adjacent stateful {@link ParDo} transforms, + * which may (or may not) be executed in similar contexts after runner optimizations. + */ + @Test + @Category({RunnableOnService.class, UsesStatefulParDo.class}) + public void testValueStateSameId() { + final String stateId = "foo"; + + DoFn<KV<String, Integer>, KV<String, Integer>> fn = + new DoFn<KV<String, Integer>, KV<String, Integer>>() { + + @StateId(stateId) + private final StateSpec<Object, ValueState<Integer>> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + c.output(KV.of("sizzle", currentValue)); + state.write(currentValue + 1); + } + }; + + DoFn<KV<String, Integer>, Integer> fn2 = + new DoFn<KV<String, Integer>, Integer>() { + + @StateId(stateId) + private final StateSpec<Object, ValueState<Integer>> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 13); + c.output(currentValue); + state.write(currentValue + 13); + } + }; + + PCollection<KV<String, Integer>> intermediate = + pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) + .apply("First stateful ParDo", ParDo.of(fn)); + + PCollection<Integer> output = + intermediate.apply("Second stateful ParDo", ParDo.of(fn2)); + + PAssert.that(intermediate) + .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2)); + PAssert.that(output).containsInAnyOrder(13, 26, 39); + pipeline.run(); + } + + @Test + @Category({RunnableOnService.class, UsesStatefulParDo.class}) public void testValueStateSideOutput() { final String stateId = "foo"; @@ -1587,6 +1686,45 @@ public class ParDoTest implements Serializable { @Test @Category({RunnableOnService.class, UsesStatefulParDo.class}) + public void testCombiningState() { + final String stateId = "foo"; + + DoFn<KV<String, Double>, String> fn = + new DoFn<KV<String, Double>, String>() { + + private static final double EPSILON = 0.0001; + + @StateId(stateId) + private final StateSpec< + Object, AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double>> + combiningState = + StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of()); + + @ProcessElement + public void processElement( + ProcessContext c, + @StateId(stateId) + AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double> state) { + state.add(c.element().getValue()); + Double currentValue = state.read(); + if (Math.abs(currentValue - 0.5) < EPSILON) { + c.output("right on"); + } + } + }; + + PCollection<String> output = + pipeline + .apply(Create.of(KV.of("hello", 0.3), KV.of("hello", 0.6), KV.of("hello", 0.6))) + .apply(ParDo.of(fn)); + + // There should only be one moment at which the average is exactly 0.5 + PAssert.that(output).containsInAnyOrder("right on"); + pipeline.run(); + } + + @Test + @Category({RunnableOnService.class, UsesStatefulParDo.class}) public void testBagStateSideInput() { final PCollectionView<List<Integer>> listView =