mr-runner: add ParDoOperation and support ParDos chaining.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/389b02b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/389b02b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/389b02b5

Branch: refs/heads/mr-runner
Commit: 389b02b576e1d9ea5123905048de3004e462a89a
Parents: 0cbdc5b
Author: Pei He <p...@apache.org>
Authored: Tue Jul 25 21:44:34 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:47 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/mapreduce/MapReduceRunner.java |   6 +-
 .../mapreduce/translation/BeamMapper.java       |  57 ++-----
 .../runners/mapreduce/translation/Graph.java    |  71 +++++----
 .../mapreduce/translation/GraphConverter.java   |  29 ++--
 .../mapreduce/translation/GraphPlanner.java     |  25 ++--
 .../mapreduce/translation/JobPrototype.java     |  71 +++++----
 .../mapreduce/translation/OutputReceiver.java   |  53 +++++++
 .../mapreduce/translation/ParDoOperation.java   | 149 +++++++++++++++++++
 .../translation/SerializedPipelineOptions.java  |  76 ++++++++++
 .../mapreduce/translation/package-info.java     |  22 +++
 .../beam/runners/mapreduce/WordCountTest.java   |   4 +-
 11 files changed, 438 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 0e3142c..11ac9a7 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -3,7 +3,6 @@ package org.apache.beam.runners.mapreduce;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Throwables;
-import java.io.IOException;
 import org.apache.beam.runners.mapreduce.translation.Graph;
 import org.apache.beam.runners.mapreduce.translation.GraphConverter;
 import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
@@ -48,10 +47,7 @@ public class MapReduceRunner extends 
PipelineRunner<PipelineResult> {
     GraphPlanner planner = new GraphPlanner();
     Graph fusedGraph = planner.plan(graph);
     for (Graph.Vertex vertex : fusedGraph.getAllVertices()) {
-      if (vertex.getTransform() instanceof GroupByKey
-          || vertex.getTransform() instanceof Read.Bounded) {
-        continue;
-      } else {
+      if (vertex.getStep().getTransform() instanceof GroupByKey) {
         JobPrototype jobPrototype = JobPrototype.create(1, vertex);
         try {
           Job job = jobPrototype.build(options.getJarClass(), new 
Configuration());

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index 9d2f80d..b74797d 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -2,17 +2,8 @@ package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.slf4j.Logger;
@@ -24,29 +15,19 @@ import org.slf4j.LoggerFactory;
 public class BeamMapper<ValueInT, ValueOutT>
     extends Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>> {
 
-  public static final String BEAM_SERIALIZED_DO_FN = "beam-serialized-do-fn";
-  private static final Logger LOG = LoggerFactory.getLogger(BeamMapper.class);
+  public static final String BEAM_SERIALIZED_PAR_DO_OPERATION = 
"beam-serialized-par-do-op";
 
-  private DoFnRunner<ValueInT, ValueOutT> doFnRunner;
-  private PipelineOptions options;
+  private ParDoOperation parDoOperation;
 
   @Override
   protected void setup(
       Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>>.Context context) {
-    String serializedDoFn = checkNotNull(
-        context.getConfiguration().get(BEAM_SERIALIZED_DO_FN),
-        BEAM_SERIALIZED_DO_FN);
-    doFnRunner = DoFnRunners.simpleRunner(
-        options,
-        (DoFn<ValueInT, ValueOutT>) SerializableUtils
-            .deserializeFromByteArray(
-                Base64.decodeBase64(serializedDoFn), "DoFn"),
-        NullSideInputReader.empty(),
-        new MROutputManager(context),
-        null,
-        ImmutableList.<TupleTag<?>>of(),
-        null,
-        WindowingStrategy.globalDefault());
+    String serializedParDo = checkNotNull(
+        context.getConfiguration().get(BEAM_SERIALIZED_PAR_DO_OPERATION),
+        BEAM_SERIALIZED_PAR_DO_OPERATION);
+    parDoOperation = (ParDoOperation) 
SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedParDo), "DoFn");
+    parDoOperation.start();
   }
 
   @Override
@@ -54,30 +35,12 @@ public class BeamMapper<ValueInT, ValueOutT>
       Object key,
       WindowedValue<ValueInT> value,
       Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>>.Context context) {
-    LOG.info("key: {}, value: {}.", key, value);
-    doFnRunner.processElement(value);
+    parDoOperation.process(value);
   }
 
   @Override
   protected void cleanup(
       Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>>.Context context) {
-  }
-
-  class MROutputManager implements DoFnRunners.OutputManager {
-
-    private final Mapper<Object, Object, Object, Object>.Context context;
-
-    MROutputManager(Mapper<?, ?, ?, ?>.Context context) {
-      this.context = (Mapper<Object, Object, Object, Object>.Context) context;
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      try {
-        context.write("global", output);
-      } catch (Exception e) {
-        Throwables.throwIfUnchecked(e);
-      }
-    }
+    parDoOperation.finish();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index da31f89..6ea774b 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -18,6 +18,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.commons.lang.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
@@ -26,7 +27,7 @@ import org.apache.commons.lang.builder.ToStringStyle;
  */
 public class Graph {
 
-  private final Map<PTransform, Vertex> vertices;
+  private final Map<Step, Vertex> vertices;
   private final Map<HeadTail, Edge> edges;
   private final Set<Vertex> leafVertices;
 
@@ -36,10 +37,10 @@ public class Graph {
     this.leafVertices = Sets.newHashSet();
   }
 
-  public Vertex addVertex(PTransform<?, ?> transform) {
-    checkState(!vertices.containsKey(transform));
-    Vertex v = new Vertex(transform);
-    vertices.put(transform, v);
+  public Vertex addVertex(Step step) {
+    checkState(!vertices.containsKey(step));
+    Vertex v = new Vertex(step);
+    vertices.put(step, v);
     leafVertices.add(v);
     return v;
   }
@@ -55,8 +56,8 @@ public class Graph {
     return e;
   }
 
-  public Vertex getVertex(PTransform<?, ?> transform) {
-    return vertices.get(transform);
+  public Vertex getVertex(Step step) {
+    return vertices.get(step);
   }
 
   public Edge getEdge(Vertex head, Vertex tail) {
@@ -84,18 +85,18 @@ public class Graph {
   //TODO: add equals, hashCode, toString for following classses.
 
   public static class Vertex {
-    private final PTransform<?, ?> transform;
+    private final Step step;
     private final Set<Edge> incoming;
     private final Set<Edge> outgoing;
 
-    public Vertex(PTransform transform) {
-      this.transform = checkNotNull(transform, "transform");
+    public Vertex(Step step) {
+      this.step = checkNotNull(step, "step");
       this.incoming = Sets.newHashSet();
       this.outgoing = Sets.newHashSet();
     }
 
-    public PTransform<?, ?> getTransform() {
-      return transform;
+    public Step getStep() {
+      return step;
     }
 
     public Set<Edge> getIncoming() {
@@ -107,11 +108,12 @@ public class Graph {
     }
 
     public boolean isSource() {
+      PTransform<?, ?> transform = step.getTransform();
       return transform instanceof Read.Bounded || transform instanceof 
Read.Unbounded;
     }
 
     public boolean isGroupByKey() {
-      return transform instanceof GroupByKey;
+      return step.getTransform() instanceof GroupByKey;
     }
 
     public void addIncoming(Edge edge) {
@@ -123,6 +125,7 @@ public class Graph {
     }
 
     public void accept(GraphVisitor visitor) {
+      PTransform<?, ?> transform = step.getTransform();
       if (transform instanceof ParDo.SingleOutput || transform instanceof 
ParDo.MultiOutput) {
         visitor.visitParDo(this);
       } else if (transform instanceof GroupByKey) {
@@ -144,14 +147,14 @@ public class Graph {
       }
       if (obj instanceof Vertex) {
         Vertex other = (Vertex) obj;
-        return transform.equals(other.transform);
+        return step.equals(other.step);
       }
       return false;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(this.getClass(), transform);
+      return Objects.hash(this.getClass(), step);
     }
 
     @Override
@@ -218,7 +221,7 @@ public class Graph {
   }
 
   public static class NodePath {
-    private final LinkedList<PTransform<?, ?>> path;
+    private final LinkedList<Step> path;
 
     public NodePath() {
       this.path = new LinkedList<>();
@@ -228,16 +231,16 @@ public class Graph {
       this.path = new LinkedList<>(nodePath.path);
     }
 
-    public void addFirst(PTransform<?, ?> transform) {
-      path.addFirst(transform);
+    public void addFirst(Step step) {
+      path.addFirst(step);
     }
 
-    public void addLast(PTransform<?, ?> transform) {
-      path.addLast(transform);
+    public void addLast(Step step) {
+      path.addLast(step);
     }
 
-    public Iterable<PTransform<?, ?>> transforms() {
-      return path;
+    public Iterable<Step> steps() {
+      return ImmutableList.copyOf(path);
     }
 
     @Override
@@ -260,15 +263,33 @@ public class Graph {
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
-      for (PTransform<?, ?> collect : path) {
-        sb.append(collect.getName() + "|");
+      for (Step step : path) {
+        sb.append(step.getFullName() + "|");
       }
-      // sb.deleteCharAt(sb.length() - 1);
+      sb.deleteCharAt(sb.length() - 1);
       return sb.toString();
     }
   }
 
   @AutoValue
+  public abstract static class Step {
+    abstract String getFullName();
+    // TODO: remove public
+    public abstract PTransform<?, ?> getTransform();
+    abstract List<TupleTag<?>> getInputs();
+    abstract List<TupleTag<?>> getOutputs();
+
+    public static Step of(
+        String fullName,
+        PTransform<?, ?> transform,
+        List<TupleTag<?>> inputs,
+        List<TupleTag<?>> outputs) {
+      return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_Step(
+          fullName, transform, inputs, outputs);
+    }
+  }
+
+  @AutoValue
   public abstract static class HeadTail {
     abstract Vertex getHead();
     abstract Vertex getTail();

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
index 359a6e2..bd56ac5 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -1,40 +1,49 @@
 package org.apache.beam.runners.mapreduce.translation;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import java.util.Map;
-import java.util.Set;
 import org.apache.beam.runners.mapreduce.MapReduceRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Pipeline translator for {@link MapReduceRunner}.
  */
 public class GraphConverter extends Pipeline.PipelineVisitor.Defaults {
 
-  private final Map<PValue, Graph.Vertex> outputToProducer;
+  private final Map<PValue, TupleTag<?>> pValueToTupleTag;
+  private final Map<TupleTag<?>, Graph.Vertex> outputToProducer;
   private final Graph graph;
 
   public GraphConverter() {
+    this.pValueToTupleTag = Maps.newHashMap();
     this.outputToProducer = Maps.newHashMap();
     this.graph = new Graph();
   }
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    Graph.Vertex v = graph.addVertex(node.getTransform());
-
-    for (PValue input : node.getInputs().values()) {
-      if (outputToProducer.containsKey(input)) {
-        Graph.Vertex producer = outputToProducer.get(input);
+    Graph.Step step = Graph.Step.of(
+        node.getFullName(),
+        node.getTransform(),
+        ImmutableList.copyOf(node.getInputs().keySet()),
+        ImmutableList.copyOf(node.getOutputs().keySet()));
+    Graph.Vertex v = graph.addVertex(step);
+
+    for (PValue pValue : node.getInputs().values()) {
+      TupleTag<?> tag = pValueToTupleTag.get(pValue);
+      if (outputToProducer.containsKey(tag)) {
+        Graph.Vertex producer = outputToProducer.get(tag);
         graph.addEdge(producer, v);
       }
     }
 
-    for (PValue output : node.getOutputs().values()) {
-      outputToProducer.put(output, v);
+    for (Map.Entry<TupleTag<?>, PValue> entry : node.getOutputs().entrySet()) {
+      pValueToTupleTag.put(entry.getValue(), entry.getKey());
+      outputToProducer.put(entry.getKey(), v);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index 793efd7..9198b28 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -31,23 +31,24 @@ public class GraphPlanner {
         // drop if read is leaf vertex.
         return;
       }
-      Graph.Vertex v = fusedGraph.addVertex(read.getTransform());
-      workingPath.addFirst(read.getTransform());
+      Graph.Vertex v = fusedGraph.addVertex(read.getStep());
+      workingPath.addFirst(read.getStep());
       Graph.Edge edge = fusedGraph.addEdge(v, workingVertex);
       edge.addPath(workingPath);
     }
 
     @Override
     public void visitParDo(Graph.Vertex parDo) {
+      Graph.Step step = parDo.getStep();
       checkArgument(
-          parDo.getTransform().getAdditionalInputs().isEmpty(),
-          "Side inputs are not supported.");
+          step.getTransform().getAdditionalInputs().isEmpty(),
+          "Side inputs are not " + "supported.");
       if (workingVertex == null) {
         // Leaf vertex
-        workingVertex = fusedGraph.addVertex(parDo.getTransform());
+        workingVertex = fusedGraph.addVertex(step);
         workingPath = new Graph.NodePath();
       } else {
-        workingPath.addFirst(parDo.getTransform());
+        workingPath.addFirst(step);
       }
       checkArgument(
           parDo.getIncoming().size() == 1,
@@ -74,10 +75,11 @@ public class GraphPlanner {
       if (workingVertex == null) {
         return;
       }
-      Graph.Vertex v = fusedGraph.addVertex(groupByKey.getTransform());
-      workingPath.addFirst(groupByKey.getTransform());
-      Graph.Edge edge = fusedGraph.addEdge(v, workingVertex);
+      Graph.Step step = groupByKey.getStep();
+      Graph.Vertex addedGroupByKey = fusedGraph.addVertex(step);
+      Graph.Edge edge = fusedGraph.addEdge(addedGroupByKey, workingVertex);
       edge.addPath(workingPath);
+      workingVertex = addedGroupByKey;
       processParent(groupByKey.getIncoming().iterator().next().getHead());
     }
 
@@ -86,13 +88,14 @@ public class GraphPlanner {
     }
 
     private void processParent(Graph.Vertex parent) {
-      Graph.Vertex v = fusedGraph.getVertex(parent.getTransform());
+      Graph.Step step = parent.getStep();
+      Graph.Vertex v = fusedGraph.getVertex(step);
       if (v == null) {
         parent.accept(this);
       } else {
         // TODO: parent is consumed more than once.
         // It is duplicated in multiple outgoing path. Figure out the impact.
-        workingPath.addFirst(parent.getTransform());
+        workingPath.addFirst(step);
         fusedGraph.getEdge(v, workingVertex).addPath(workingPath);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index bdbbe5d..7cdf697 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -4,21 +4,23 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
@@ -51,8 +53,9 @@ public class JobPrototype {
     // Setup BoundedSources in BeamInputFormat.
     // TODO: support more than one inputs
     Graph.Vertex head = 
Iterables.getOnlyElement(vertex.getIncoming()).getHead();
-    checkState(head.getTransform() instanceof Read.Bounded);
-    Read.Bounded read = (Read.Bounded) head.getTransform();
+    Graph.Step headStep = head.getStep();
+    checkState(headStep.getTransform() instanceof Read.Bounded);
+    Read.Bounded read = (Read.Bounded) headStep.getTransform();
     conf.set(
         BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
         
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(read.getSource())));
@@ -62,34 +65,52 @@ public class JobPrototype {
     // TODO: support more than one out going edge.
     Graph.Edge outEdge = Iterables.getOnlyElement(head.getOutgoing());
     Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths());
-    List<DoFn> doFns = new ArrayList<>();
-    doFns.addAll(FluentIterable.from(outPath.transforms())
-        .filter(new Predicate<PTransform<?, ?>>() {
+    List<Graph.Step> parDos = new ArrayList<>();
+    parDos.addAll(FluentIterable.from(outPath.steps())
+        .filter(new Predicate<Graph.Step>() {
           @Override
-          public boolean apply(PTransform<?, ?> input) {
-            return !(input instanceof Read.Bounded);
-          }
-        })
-        .transform(new Function<PTransform<?, ?>, DoFn>() {
-          @Override
-          public DoFn apply(PTransform<?, ?> input) {
-            checkArgument(
-                input instanceof ParDo.SingleOutput, "Only support 
ParDo.SingleOutput.");
-            ParDo.SingleOutput parDo = (ParDo.SingleOutput) input;
-            return parDo.getFn();
+          public boolean apply(Graph.Step input) {
+            PTransform<?, ?> transform = input.getTransform();
+            return transform instanceof ParDo.SingleOutput
+                || transform instanceof ParDo.MultiOutput;
           }})
         .toList());
-    if (vertex.getTransform() instanceof ParDo.SingleOutput) {
-      doFns.add(((ParDo.SingleOutput) vertex.getTransform()).getFn());
-    } else if (vertex.getTransform() instanceof ParDo.MultiOutput) {
-      doFns.add(((ParDo.MultiOutput) vertex.getTransform()).getFn());
+    Graph.Step vertexStep = vertex.getStep();
+    if (vertexStep.getTransform() instanceof ParDo.SingleOutput
+        || vertexStep.getTransform() instanceof ParDo.MultiOutput) {
+      parDos.add(vertexStep);
+    }
+
+    ParDoOperation root = null;
+    ParDoOperation prev = null;
+    for (Graph.Step step : parDos) {
+      ParDoOperation current = new ParDoOperation(
+          getDoFn(step.getTransform()),
+          PipelineOptionsFactory.create(),
+          (TupleTag<Object>) step.getOutputs().iterator().next(),
+          ImmutableList.<TupleTag<?>>of(),
+          WindowingStrategy.globalDefault());
+      if (root == null) {
+        root = current;
+      } else {
+        // TODO: set a proper outputNum for ParDo.MultiOutput instead of zero.
+        current.attachInput(prev, 0);
+      }
+      prev = current;
     }
     conf.set(
-        BeamMapper.BEAM_SERIALIZED_DO_FN,
-        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
-            Iterables.getOnlyElement(doFns))));
+        BeamMapper.BEAM_SERIALIZED_PAR_DO_OPERATION,
+        
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(root)));
     job.setMapperClass(BeamMapper.class);
     job.setOutputFormatClass(NullOutputFormat.class);
     return job;
   }
+
+  private DoFn<Object, Object> getDoFn(PTransform<?, ?> transform) {
+    if (transform instanceof ParDo.SingleOutput) {
+      return ((ParDo.SingleOutput) transform).getFn();
+    } else {
+      return ((ParDo.MultiOutput) transform).getFn();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
new file mode 100644
index 0000000..59cee3c
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * OutputReceiver that forwards each input it receives to each of a list of 
down stream
+ * ParDoOperations.
+ */
+public class OutputReceiver implements Serializable {
+  private final List<ParDoOperation> receiverParDos = new ArrayList<>();
+
+  /**
+   * Adds a new receiver that this OutputReceiver forwards to.
+   */
+  public void addOutput(ParDoOperation receiver) {
+    receiverParDos.add(receiver);
+  }
+
+  public List<ParDoOperation> getReceiverParDos() {
+    return ImmutableList.copyOf(receiverParDos);
+  }
+
+  /**
+   * Processes the element.
+   */
+  public void process(Object elem) {
+    for (ParDoOperation out : receiverParDos) {
+      if (out != null) {
+        out.process(elem);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
new file mode 100644
index 0000000..97473bb
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Operation for ParDo.
+ */
+public class ParDoOperation implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParDoOperation.class);
+
+  private final DoFn<Object, Object> doFn;
+  private final SerializedPipelineOptions options;
+  private final TupleTag<Object> mainOutputTag;
+  private final List<TupleTag<?>> sideOutputTags;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final OutputReceiver[] receivers;
+
+  private DoFnRunner<Object, Object> fnRunner;
+
+  public ParDoOperation(
+      DoFn<Object, Object> doFn,
+      PipelineOptions options,
+      TupleTag<Object> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    this.doFn = checkNotNull(doFn, "doFn");
+    this.options = new SerializedPipelineOptions(checkNotNull(options, 
"options"));
+    this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag");
+    this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags");
+    this.windowingStrategy = checkNotNull(windowingStrategy, 
"windowingStrategy");
+    int numOutputs = 1 + sideOutputTags.size();
+    this.receivers = new OutputReceiver[numOutputs];
+    for (int i = 0; i < numOutputs; ++i) {
+      receivers[i] = new OutputReceiver();
+    }
+  }
+
+  /**
+   * Adds an input to this ParDoOperation, coming from the given output of the 
given source.
+   */
+  public void attachInput(ParDoOperation source, int outputNum) {
+    OutputReceiver fanOut = source.receivers[outputNum];
+    fanOut.addOutput(this);
+  }
+
+  /**
+   * Starts this Operation's execution.
+   *
+   * <p>Called after all successors consuming operations have been started.
+   */
+  public void start() {
+    fnRunner = DoFnRunners.simpleRunner(
+        options.getPipelineOptions(),
+        doFn,
+        NullSideInputReader.empty(),
+        new ParDoOutputManager(),
+        mainOutputTag,
+        sideOutputTags,
+        null,
+        windowingStrategy);
+    fnRunner.startBundle();
+    for (OutputReceiver receiver : receivers) {
+      if (receiver == null) {
+        continue;
+      }
+      for (ParDoOperation parDo : receiver.getReceiverParDos()) {
+        parDo.start();
+      }
+    }
+  }
+
+  /**
+   * Processes the element.
+   */
+  public void process(Object elem) {
+    LOG.info("elem: {}.", elem);
+    fnRunner.processElement((WindowedValue<Object>) elem);
+  }
+
+  /**
+   * Finishes this Operation's execution.
+   *
+   * <p>Called after all predecessors producing operations have been finished.
+   */
+  public void finish() {
+    for (OutputReceiver receiver : receivers) {
+      if (receiver == null) {
+        continue;
+      }
+      for (ParDoOperation parDo : receiver.getReceiverParDos()) {
+        parDo.finish();
+      }
+    }
+    fnRunner.finishBundle();
+  }
+
+  private class ParDoOutputManager implements DoFnRunners.OutputManager {
+
+    @Nullable
+    private OutputReceiver getReceiverOrNull(TupleTag<?> tag) {
+      if (tag.equals(mainOutputTag)) {
+        return receivers[0];
+      } else if (sideOutputTags.contains(tag)) {
+        return receivers[sideOutputTags.indexOf(tag) + 1];
+      } else {
+        return null;
+      }
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> 
windowedValue) {
+      OutputReceiver receiver = getReceiverOrNull(tupleTag);
+      if (receiver != null) {
+        receiver.process(windowedValue);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
new file mode 100644
index 0000000..5c37b7c
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the 
cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+  private final byte[] serializedOptions;
+
+  /** Lazily initialized copy of deserialized options. */
+  private transient PipelineOptions pipelineOptions;
+
+  public SerializedPipelineOptions(PipelineOptions options) {
+    checkNotNull(options, "PipelineOptions must not be null.");
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      createMapper().writeValue(baos, options);
+      this.serializedOptions = baos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+    }
+
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    if (pipelineOptions == null) {
+      try {
+        pipelineOptions = createMapper().readValue(serializedOptions, 
PipelineOptions.class);
+
+        FileSystems.setDefaultPipelineOptions(pipelineOptions);
+      } catch (IOException e) {
+        throw new RuntimeException("Couldn't deserialize the 
PipelineOptions.", e);
+      }
+    }
+
+    return pipelineOptions;
+  }
+
+  /**
+   * Use an {@link ObjectMapper} configured with any {@link Module}s in the 
class path allowing
+   * for user specified configuration injection into the ObjectMapper. This 
supports user custom
+   * types on {@link PipelineOptions}.
+   */
+  private static ObjectMapper createMapper() {
+    return new ObjectMapper().registerModules(
+        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java
new file mode 100644
index 0000000..c9360ac
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for translating Beam pipelines to MapReduce jobs.
+ */
+package org.apache.beam.runners.mapreduce.translation;

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
index 80df3e1..f5eee28 100644
--- 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
+++ 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
@@ -78,8 +78,8 @@ public class WordCountTest {
     // Concepts #2 and #3: Our pipeline applies the composite CountWords 
transform, and passes the
     // static FormatAsTextFn() to the ParDo transform.
     p.apply("ReadLines", TextIO.read().from(input))
-        .apply(ParDo.of(new ExtractWordsFn()));
-//        .apply(Count.<String>perElement())
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
 //        .apply(MapElements.via(new FormatAsTextFn()))
 //        .apply("WriteCounts", TextIO.write().to(output));
 

Reply via email to