Repository: incubator-beam
Updated Branches:
  refs/heads/master 5747951ff -> e849d95d1


Fix SplittableParDoTest

This required fixing GBKIntoKeyedWorkItems to properly set the coder on
the primitive, and updating the assertions to match the actual (and
correct) behavior.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/577d04ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/577d04ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/577d04ab

Branch: refs/heads/master
Commit: 577d04ab0799b18d5c2c88e2250859678f589968
Parents: b8e6eea
Author: bchambers <bchamb...@google.com>
Authored: Mon Oct 17 12:35:03 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Mon Oct 17 12:35:03 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/GBKIntoKeyedWorkItems.java   | 17 ++++++++++++++++-
 .../beam/runners/core/SplittableParDoTest.java     | 10 +++++++---
 2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/577d04ab/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
index ca4d681..304e349 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
@@ -17,10 +17,15 @@
  */
 package org.apache.beam.runners.core;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -34,7 +39,17 @@ public class GBKIntoKeyedWorkItems<KeyT, InputT>
     extends PTransform<PCollection<KV<KeyT, InputT>>, 
PCollection<KeyedWorkItem<KeyT, InputT>>> {
   @Override
   public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, 
InputT>> input) {
-    return PCollection.createPrimitiveOutputInternal(
+    checkArgument(input.getCoder() instanceof KvCoder,
+        "Expected input coder to be KvCoder, but was %s",
+        input.getCoder().getClass().getSimpleName());
+
+    KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
+    Coder<KeyedWorkItem<KeyT, InputT>> coder = KeyedWorkItemCoder.of(
+        kvCoder.getKeyCoder(), kvCoder.getValueCoder(),
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+    PCollection<KeyedWorkItem<KeyT, InputT>> collection = 
PCollection.createPrimitiveOutputInternal(
         input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    collection.setCoder((Coder) coder);
+    return collection;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/577d04ab/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index a76c4da..b7cdc64 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -128,12 +128,14 @@ public class SplittableParDoTest {
     Pipeline pipeline = TestPipeline.create();
     DoFn<Integer, String> boundedFn = new BoundedFakeFn();
     assertEquals(
+        "Applying a bounded SDF to a bounded collection produces a bounded 
collection",
         PCollection.IsBounded.BOUNDED,
         makeBoundedCollection(pipeline)
             .apply("bounded to bounded", new SplittableParDo<>(boundedFn))
             .isBounded());
     assertEquals(
-        PCollection.IsBounded.BOUNDED,
+        "Applying a bounded SDF to an unbounded collection produces an 
unbounded collection",
+        PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
             .apply("bounded to unbounded", new SplittableParDo<>(boundedFn))
             .isBounded());
@@ -145,12 +147,14 @@ public class SplittableParDoTest {
     Pipeline pipeline = TestPipeline.create();
     DoFn<Integer, String> unboundedFn = new UnboundedFakeFn();
     assertEquals(
-        PCollection.IsBounded.BOUNDED,
+        "Applying an unbounded SDF to a bounded collection produces a bounded 
collection",
+        PCollection.IsBounded.UNBOUNDED,
         makeBoundedCollection(pipeline)
             .apply("unbounded to bounded", new SplittableParDo<>(unboundedFn))
             .isBounded());
     assertEquals(
-        PCollection.IsBounded.BOUNDED,
+        "Applying an unbounded SDF to an unbounded collection produces an 
unbounded collection",
+        PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
             .apply("unbounded to unbounded", new 
SplittableParDo<>(unboundedFn))
             .isBounded());

Reply via email to