mr-runner: remove the hard-coded GlobalWindow coder, and fixes WindowingTest.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d8b12a6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d8b12a6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d8b12a6

Branch: refs/heads/mr-runner
Commit: 2d8b12a6ec6d58a01084c6c06e92b3f884c166ba
Parents: ca0b15a
Author: Pei He <p...@apache.org>
Authored: Thu Aug 31 18:58:16 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Fri Sep 1 17:13:40 2017 +0800

----------------------------------------------------------------------
 .../translation/FileReadOperation.java          | 11 ++++----
 .../mapreduce/translation/GraphPlanner.java     |  7 +++--
 .../runners/mapreduce/translation/Graphs.java   | 10 ++++++--
 .../mapreduce/translation/JobPrototype.java     |  2 +-
 .../translation/TranslationContext.java         | 27 ++++++++++++++------
 5 files changed, 36 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
index a95e79e..cbbfbd2 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -47,12 +47,12 @@ import org.apache.hadoop.io.SequenceFile;
 public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>> {
 
   private final String fileName;
-  private final Coder<?> coder;
+  private final Coder<WindowedValue<T>> coder;
   private final TupleTag<?> tupleTag;
 
   public FileReadOperation(
       String fileName,
-      Coder<T> coder,
+      Coder<WindowedValue<T>> coder,
       TupleTag<?> tupleTag) {
     super();
     this.fileName = checkNotNull(fileName, "fileName");
@@ -73,11 +73,10 @@ public class FileReadOperation<T> extends 
ReadOperation<WindowedValue<T>> {
     private final Coder<WindowedValue<T>> coder;
     private final SerializableConfiguration conf;
 
-    FileBoundedSource(String fileName, Coder<T> coder, 
SerializableConfiguration conf) {
+    FileBoundedSource(
+        String fileName, Coder<WindowedValue<T>> coder, 
SerializableConfiguration conf) {
       this.fileName = checkNotNull(fileName, "fileName");
-      checkNotNull(coder, "coder");
-      this.coder = WindowedValue.getFullCoder(
-          coder, 
WindowingStrategy.globalDefault().getWindowFn().windowCoder());
+      this.coder = checkNotNull(coder, "coder");
       this.conf = checkNotNull(conf, "conf");
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index 608b304..6c79277 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -58,9 +58,8 @@ public class GraphPlanner {
         String tagName = tag.getName();
         String fileName = ConfigurationUtils.toFileName(tagName);
 
-        // TODO: should not hard-code windows coder.
         WindowedValue.WindowedValueCoder<?> writeValueCoder = 
WindowedValue.getFullCoder(
-            tag.getCoder(), 
WindowingStrategy.globalDefault().getWindowFn().windowCoder());
+            tag.getCoder(), 
tag.getWindowingStrategy().getWindowFn().windowCoder());
 
         fusedStep.addStep(
             Graphs.Step.of(
@@ -71,7 +70,7 @@ public class GraphPlanner {
 
         String readStepName = tagName + "/Read";
         Graphs.Tag readOutput = Graphs.Tag.of(
-            readStepName + ".out", tag.getTupleTag(), tag.getCoder());
+            readStepName + ".out", tag.getTupleTag(), tag.getCoder(), 
tag.getWindowingStrategy());
         for (Graphs.FusedStep consumer : consumers) {
           // Re-direct tag to readOutput.
           List<Graphs.Step> receivers = consumer.getConsumers(tag);
@@ -84,7 +83,7 @@ public class GraphPlanner {
           consumer.addStep(
               Graphs.Step.of(
                   readStepName,
-                  new FileReadOperation(filePath, tag.getCoder(), 
tag.getTupleTag())),
+                  new FileReadOperation(filePath, writeValueCoder, 
tag.getTupleTag())),
               ImmutableList.<Graphs.Tag>of(),
               ImmutableList.of(readOutput));
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
index b2f793a..0b93c3a 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
@@ -25,6 +25,7 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
  * Class that defines graph vertices.
@@ -234,15 +235,20 @@ public class Graphs {
     abstract String getName();
     abstract TupleTag<?> getTupleTag();
     abstract Coder<?> getCoder();
+    abstract WindowingStrategy<?, ?> getWindowingStrategy();
 
     @Override
     public String toString() {
       return getName();
     }
 
-    public static Tag of(String name, TupleTag<?> tupleTag, Coder<?> coder) {
+    public static Tag of(
+        String name,
+        TupleTag<?> tupleTag,
+        Coder<?> coder,
+        WindowingStrategy<?, ?> windowingStrategy) {
       return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Tag(
-          name, tupleTag, coder);
+          name, tupleTag, coder, windowingStrategy);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 93ae33a..44f279b 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -117,7 +117,7 @@ public class JobPrototype {
       String reifyStepName = groupByKey.getFullName() + "-Reify";
       Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), 
windowingStrategy);
       Graphs.Tag reifyOutputTag = Graphs.Tag.of(
-          reifyStepName + ".out", new TupleTag<>(), reifyValueCoder);
+          reifyStepName + ".out", new TupleTag<>(), reifyValueCoder, 
windowingStrategy);
       Graphs.Step reifyStep = Graphs.Step.of(
           reifyStepName,
           new ReifyTimestampAndWindowsParDoOperation(

http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
index fd6c0ba..93856de 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -125,12 +125,16 @@ public class TranslationContext {
               if (pValue instanceof PCollection) {
                 PCollection<?> pc = (PCollection<?>) pValue;
                 return Graphs.Tag.of(
-                    pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
-              } else {
+                    pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder(), 
pc.getWindowingStrategy());
+              } else if (pValue instanceof PCollectionView){
+                PCollectionView pView = (PCollectionView) pValue;
                 return Graphs.Tag.of(
                     pValue.getName(),
                     pValueToTupleTag.get(pValue),
-                    ((PCollectionView) pValue).getCoderInternal());
+                    pView.getCoderInternal(),
+                    pView.getWindowingStrategyInternal());
+              } else {
+                throw new RuntimeException("Unexpected PValue: " + 
pValue.getClass());
               }
             }})
           .toList();
@@ -150,12 +154,17 @@ public class TranslationContext {
               if (pValue instanceof PCollection) {
                 PCollection<?> pc = (PCollection<?>) pValue;
                 return Graphs.Tag.of(
-                    pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
-              } else {
+                    pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder(),
+                    pc.getWindowingStrategy());
+              } else if (pValue instanceof PCollectionView){
+                PCollectionView pView = (PCollectionView) pValue;
                 return Graphs.Tag.of(
                     pValue.getName(),
                     pValueToTupleTag.get(pValue),
-                    ((PCollectionView) pValue).getCoderInternal());
+                    pView.getCoderInternal(),
+                    pView.getWindowingStrategyInternal());
+              } else {
+                throw new RuntimeException("Unexpected PValue: " + 
pValue.getClass());
               }
             }})
           .toList();
@@ -165,14 +174,16 @@ public class TranslationContext {
       if (currentNode.getTransform() instanceof View.CreatePCollectionView) {
         PCollectionView view = ((View.CreatePCollectionView) 
currentNode.getTransform()).getView();
         return ImmutableList.of(
-            Graphs.Tag.of(view.getName(), view.getTagInternal(), 
view.getCoderInternal()));
+            Graphs.Tag.of(view.getName(), view.getTagInternal(), 
view.getCoderInternal(),
+                view.getWindowingStrategyInternal()));
       } else {
         return FluentIterable.from(currentNode.getOutputs().entrySet())
             .transform(new Function<Map.Entry<TupleTag<?>, PValue>, 
Graphs.Tag>() {
               @Override
               public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) {
                 PCollection<?> pc = (PCollection<?>) entry.getValue();
-                return Graphs.Tag.of(pc.getName(), entry.getKey(), 
pc.getCoder());
+                return Graphs.Tag.of(
+                    pc.getName(), entry.getKey(), pc.getCoder(), 
pc.getWindowingStrategy());
               }})
             .toList();
       }

Reply via email to