mr-runner: remove the hard-coded GlobalWindow coder, and fixes WindowingTest.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d8b12a6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d8b12a6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d8b12a6 Branch: refs/heads/mr-runner Commit: 2d8b12a6ec6d58a01084c6c06e92b3f884c166ba Parents: ca0b15a Author: Pei He <p...@apache.org> Authored: Thu Aug 31 18:58:16 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 1 17:13:40 2017 +0800 ---------------------------------------------------------------------- .../translation/FileReadOperation.java | 11 ++++---- .../mapreduce/translation/GraphPlanner.java | 7 +++-- .../runners/mapreduce/translation/Graphs.java | 10 ++++++-- .../mapreduce/translation/JobPrototype.java | 2 +- .../translation/TranslationContext.java | 27 ++++++++++++++------ 5 files changed, 36 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java index a95e79e..cbbfbd2 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java @@ -47,12 +47,12 @@ import org.apache.hadoop.io.SequenceFile; public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>> { private final String fileName; - private final Coder<?> coder; + private final Coder<WindowedValue<T>> coder; private final TupleTag<?> tupleTag; public FileReadOperation( String fileName, - Coder<T> coder, + Coder<WindowedValue<T>> coder, TupleTag<?> tupleTag) { super(); this.fileName = checkNotNull(fileName, "fileName"); @@ -73,11 +73,10 @@ public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>> { private final Coder<WindowedValue<T>> coder; private final SerializableConfiguration conf; - FileBoundedSource(String fileName, Coder<T> coder, SerializableConfiguration conf) { + FileBoundedSource( + String fileName, Coder<WindowedValue<T>> coder, SerializableConfiguration conf) { this.fileName = checkNotNull(fileName, "fileName"); - checkNotNull(coder, "coder"); - this.coder = WindowedValue.getFullCoder( - coder, WindowingStrategy.globalDefault().getWindowFn().windowCoder()); + this.coder = checkNotNull(coder, "coder"); this.conf = checkNotNull(conf, "conf"); } http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index 608b304..6c79277 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -58,9 +58,8 @@ public class GraphPlanner { String tagName = tag.getName(); String fileName = ConfigurationUtils.toFileName(tagName); - // TODO: should not hard-code windows coder. WindowedValue.WindowedValueCoder<?> writeValueCoder = WindowedValue.getFullCoder( - tag.getCoder(), WindowingStrategy.globalDefault().getWindowFn().windowCoder()); + tag.getCoder(), tag.getWindowingStrategy().getWindowFn().windowCoder()); fusedStep.addStep( Graphs.Step.of( @@ -71,7 +70,7 @@ public class GraphPlanner { String readStepName = tagName + "/Read"; Graphs.Tag readOutput = Graphs.Tag.of( - readStepName + ".out", tag.getTupleTag(), tag.getCoder()); + readStepName + ".out", tag.getTupleTag(), tag.getCoder(), tag.getWindowingStrategy()); for (Graphs.FusedStep consumer : consumers) { // Re-direct tag to readOutput. List<Graphs.Step> receivers = consumer.getConsumers(tag); @@ -84,7 +83,7 @@ public class GraphPlanner { consumer.addStep( Graphs.Step.of( readStepName, - new FileReadOperation(filePath, tag.getCoder(), tag.getTupleTag())), + new FileReadOperation(filePath, writeValueCoder, tag.getTupleTag())), ImmutableList.<Graphs.Tag>of(), ImmutableList.of(readOutput)); } http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java index b2f793a..0b93c3a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java @@ -25,6 +25,7 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; /** * Class that defines graph vertices. @@ -234,15 +235,20 @@ public class Graphs { abstract String getName(); abstract TupleTag<?> getTupleTag(); abstract Coder<?> getCoder(); + abstract WindowingStrategy<?, ?> getWindowingStrategy(); @Override public String toString() { return getName(); } - public static Tag of(String name, TupleTag<?> tupleTag, Coder<?> coder) { + public static Tag of( + String name, + TupleTag<?> tupleTag, + Coder<?> coder, + WindowingStrategy<?, ?> windowingStrategy) { return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Tag( - name, tupleTag, coder); + name, tupleTag, coder, windowingStrategy); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index 93ae33a..44f279b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -117,7 +117,7 @@ public class JobPrototype { String reifyStepName = groupByKey.getFullName() + "-Reify"; Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), windowingStrategy); Graphs.Tag reifyOutputTag = Graphs.Tag.of( - reifyStepName + ".out", new TupleTag<>(), reifyValueCoder); + reifyStepName + ".out", new TupleTag<>(), reifyValueCoder, windowingStrategy); Graphs.Step reifyStep = Graphs.Step.of( reifyStepName, new ReifyTimestampAndWindowsParDoOperation( http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java index fd6c0ba..93856de 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java @@ -125,12 +125,16 @@ public class TranslationContext { if (pValue instanceof PCollection) { PCollection<?> pc = (PCollection<?>) pValue; return Graphs.Tag.of( - pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder()); - } else { + pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder(), pc.getWindowingStrategy()); + } else if (pValue instanceof PCollectionView){ + PCollectionView pView = (PCollectionView) pValue; return Graphs.Tag.of( pValue.getName(), pValueToTupleTag.get(pValue), - ((PCollectionView) pValue).getCoderInternal()); + pView.getCoderInternal(), + pView.getWindowingStrategyInternal()); + } else { + throw new RuntimeException("Unexpected PValue: " + pValue.getClass()); } }}) .toList(); @@ -150,12 +154,17 @@ public class TranslationContext { if (pValue instanceof PCollection) { PCollection<?> pc = (PCollection<?>) pValue; return Graphs.Tag.of( - pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder()); - } else { + pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder(), + pc.getWindowingStrategy()); + } else if (pValue instanceof PCollectionView){ + PCollectionView pView = (PCollectionView) pValue; return Graphs.Tag.of( pValue.getName(), pValueToTupleTag.get(pValue), - ((PCollectionView) pValue).getCoderInternal()); + pView.getCoderInternal(), + pView.getWindowingStrategyInternal()); + } else { + throw new RuntimeException("Unexpected PValue: " + pValue.getClass()); } }}) .toList(); @@ -165,14 +174,16 @@ public class TranslationContext { if (currentNode.getTransform() instanceof View.CreatePCollectionView) { PCollectionView view = ((View.CreatePCollectionView) currentNode.getTransform()).getView(); return ImmutableList.of( - Graphs.Tag.of(view.getName(), view.getTagInternal(), view.getCoderInternal())); + Graphs.Tag.of(view.getName(), view.getTagInternal(), view.getCoderInternal(), + view.getWindowingStrategyInternal())); } else { return FluentIterable.from(currentNode.getOutputs().entrySet()) .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>() { @Override public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) { PCollection<?> pc = (PCollection<?>) entry.getValue(); - return Graphs.Tag.of(pc.getName(), entry.getKey(), pc.getCoder()); + return Graphs.Tag.of( + pc.getName(), entry.getKey(), pc.getCoder(), pc.getWindowingStrategy()); }}) .toList(); }