Repository: incubator-beam Updated Branches: refs/heads/master 5747951ff -> e849d95d1
Fix SplittableParDoTest This required fixing GBKIntoKeyedWorkItems to properly set the coder on the primitive, and updating the assertions to match the actual (and correct) behavior. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/577d04ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/577d04ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/577d04ab Branch: refs/heads/master Commit: 577d04ab0799b18d5c2c88e2250859678f589968 Parents: b8e6eea Author: bchambers <bchamb...@google.com> Authored: Mon Oct 17 12:35:03 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Mon Oct 17 12:35:03 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/core/GBKIntoKeyedWorkItems.java | 17 ++++++++++++++++- .../beam/runners/core/SplittableParDoTest.java | 10 +++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/577d04ab/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java index ca4d681..304e349 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java @@ -17,10 +17,15 @@ */ package org.apache.beam.runners.core; +import static com.google.common.base.Preconditions.checkArgument; + import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItemCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -34,7 +39,17 @@ public class GBKIntoKeyedWorkItems<KeyT, InputT> extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> { @Override public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) { - return PCollection.createPrimitiveOutputInternal( + checkArgument(input.getCoder() instanceof KvCoder, + "Expected input coder to be KvCoder, but was %s", + input.getCoder().getClass().getSimpleName()); + + KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder(); + Coder<KeyedWorkItem<KeyT, InputT>> coder = KeyedWorkItemCoder.of( + kvCoder.getKeyCoder(), kvCoder.getValueCoder(), + input.getWindowingStrategy().getWindowFn().windowCoder()); + PCollection<KeyedWorkItem<KeyT, InputT>> collection = PCollection.createPrimitiveOutputInternal( input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + collection.setCoder((Coder) coder); + return collection; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/577d04ab/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index a76c4da..b7cdc64 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -128,12 +128,14 @@ public class SplittableParDoTest { Pipeline pipeline = TestPipeline.create(); DoFn<Integer, String> boundedFn = new BoundedFakeFn(); assertEquals( + "Applying a bounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.BOUNDED, makeBoundedCollection(pipeline) .apply("bounded to bounded", new SplittableParDo<>(boundedFn)) .isBounded()); assertEquals( - PCollection.IsBounded.BOUNDED, + "Applying a bounded SDF to an unbounded collection produces an unbounded collection", + PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) .apply("bounded to unbounded", new SplittableParDo<>(boundedFn)) .isBounded()); @@ -145,12 +147,14 @@ public class SplittableParDoTest { Pipeline pipeline = TestPipeline.create(); DoFn<Integer, String> unboundedFn = new UnboundedFakeFn(); assertEquals( - PCollection.IsBounded.BOUNDED, + "Applying an unbounded SDF to a bounded collection produces a bounded collection", + PCollection.IsBounded.UNBOUNDED, makeBoundedCollection(pipeline) .apply("unbounded to bounded", new SplittableParDo<>(unboundedFn)) .isBounded()); assertEquals( - PCollection.IsBounded.BOUNDED, + "Applying an unbounded SDF to an unbounded collection produces an unbounded collection", + PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) .apply("unbounded to unbounded", new SplittableParDo<>(unboundedFn)) .isBounded());