mr-runner: add BeamReducer and support GroupByKey.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/923190dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/923190dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/923190dc

Branch: refs/heads/mr-runner
Commit: 923190dca2426711e30e5c5fe7093e14fcbefe07
Parents: 389b02b
Author: Pei He <p...@apache.org>
Authored: Wed Jul 26 21:19:30 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:47 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/translation/BeamMapper.java       |  13 +-
 .../mapreduce/translation/BeamReducer.java      |  68 +++++++++++
 .../runners/mapreduce/translation/Graph.java    |  36 +++---
 .../mapreduce/translation/GraphConverter.java   |  26 +++-
 .../mapreduce/translation/GraphPlanner.java     |  28 +++--
 .../GroupAlsoByWindowsParDoOperation.java       |  38 ++++++
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  | 120 +++++++++++++++++++
 .../mapreduce/translation/JobPrototype.java     |  46 +++++--
 .../translation/NormalParDoOperation.java       |  49 ++++++++
 .../mapreduce/translation/Operation.java        |  69 +++++++++++
 .../mapreduce/translation/OutputReceiver.java   |  12 +-
 .../mapreduce/translation/ParDoOperation.java   |  73 ++++-------
 .../mapreduce/translation/WriteOperation.java   |  52 ++++++++
 .../beam/runners/mapreduce/WordCountTest.java   |   7 --
 14 files changed, 534 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index b74797d..11ecc8d 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -6,8 +6,7 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 /**
  * Created by peihe on 21/07/2017.
@@ -15,7 +14,7 @@ import org.slf4j.LoggerFactory;
 public class BeamMapper<ValueInT, ValueOutT>
     extends Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>> {
 
-  public static final String BEAM_SERIALIZED_PAR_DO_OPERATION = 
"beam-serialized-par-do-op";
+  public static final String BEAM_PAR_DO_OPERATION_MAPPER = 
"beam-par-do-op-mapper";
 
   private ParDoOperation parDoOperation;
 
@@ -23,11 +22,11 @@ public class BeamMapper<ValueInT, ValueOutT>
   protected void setup(
       Mapper<Object, WindowedValue<ValueInT>, Object, 
WindowedValue<ValueOutT>>.Context context) {
     String serializedParDo = checkNotNull(
-        context.getConfiguration().get(BEAM_SERIALIZED_PAR_DO_OPERATION),
-        BEAM_SERIALIZED_PAR_DO_OPERATION);
+        context.getConfiguration().get(BEAM_PAR_DO_OPERATION_MAPPER),
+        BEAM_PAR_DO_OPERATION_MAPPER);
     parDoOperation = (ParDoOperation) 
SerializableUtils.deserializeFromByteArray(
-        Base64.decodeBase64(serializedParDo), "DoFn");
-    parDoOperation.start();
+        Base64.decodeBase64(serializedParDo), "ParDoOperation");
+    parDoOperation.start((TaskInputOutputContext) context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
new file mode 100644
index 0000000..8eb7938
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
@@ -0,0 +1,68 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Created by peihe on 25/07/2017.
+ */
+public class BeamReducer<ValueInT, ValueOutT>
+    extends Reducer<Object, byte[], Object, WindowedValue<ValueOutT>> {
+
+  public static final String BEAM_PAR_DO_OPERATION_REDUCER = 
"beam-par-do-op-reducer";
+
+  private ParDoOperation parDoOperation;
+
+  @Override
+  protected void setup(
+      Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
+    String serializedParDo = checkNotNull(
+        context.getConfiguration().get(BEAM_PAR_DO_OPERATION_REDUCER),
+        BEAM_PAR_DO_OPERATION_REDUCER);
+    parDoOperation = (ParDoOperation) 
SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedParDo), "ParDoOperation");
+    parDoOperation.start((TaskInputOutputContext) context);
+  }
+
+  @Override
+  protected void reduce(
+      Object key,
+      Iterable<byte[]> values,
+      Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
+    Iterable<Object> decodedValues = FluentIterable.from(values)
+        .transform(new Function<byte[], Object>() {
+          @Override
+          public Object apply(byte[] input) {
+            ByteArrayInputStream inStream = new ByteArrayInputStream(input);
+            try {
+              // TODO: setup coders.
+              return 
NullableCoder.of(BigEndianLongCoder.of()).decode(inStream);
+            } catch (IOException e) {
+              Throwables.throwIfUnchecked(e);
+              throw new RuntimeException(e);
+            }
+          }
+        });
+    parDoOperation.process(
+        WindowedValue.valueInGlobalWindow(KV.of(key, decodedValues)));
+  }
+
+  @Override
+  protected void cleanup(
+      Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context 
context) {
+    parDoOperation.finish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index 6ea774b..867d1af 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -12,13 +12,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.commons.lang.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
@@ -45,10 +46,10 @@ public class Graph {
     return v;
   }
 
-  public Edge addEdge(Vertex head, Vertex tail) {
+  public Edge addEdge(Vertex head, Vertex tail, Coder<?> coder) {
     HeadTail headTail = HeadTail.of(head, tail);
     checkState(!edges.containsKey(headTail));
-    Edge e = new Edge(headTail);
+    Edge e = new Edge(headTail, coder);
     edges.put(headTail, e);
     head.addOutgoing(e);
     tail.addIncoming(e);
@@ -166,18 +167,16 @@ public class Graph {
 
   public static class Edge {
     private final HeadTail headTail;
+    private final Coder<?> coder;
     private final Set<NodePath> paths;
 
-    public static Edge of(Vertex head, Vertex tail) {
-      return of(HeadTail.of(head, tail));
+    public static Edge of(HeadTail headTail, Coder<?> coder) {
+      return new Edge(headTail, coder);
     }
 
-    public static Edge of(HeadTail headTail) {
-      return new Edge(headTail);
-    }
-
-    private Edge(HeadTail headTail) {
+    private Edge(HeadTail headTail, Coder<?> coder) {
       this.headTail = checkNotNull(headTail, "headTail");
+      this.coder = checkNotNull(coder, "coder");
       this.paths = Sets.newHashSet();
     }
 
@@ -189,6 +188,10 @@ public class Graph {
       return headTail.getTail();
     }
 
+    public Coder<?> getCoder() {
+      return coder;
+    }
+
     public Set<NodePath> getPaths() {
       return paths;
     }
@@ -204,14 +207,15 @@ public class Graph {
       }
       if (obj instanceof Edge) {
         Edge other = (Edge) obj;
-        return headTail.equals(other.headTail) && paths.equals(paths);
+        return headTail.equals(other.headTail)
+            && paths.equals(other.paths) && coder.equals(other.coder);
       }
       return false;
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(headTail, paths);
+      return Objects.hash(headTail, paths, coder);
     }
 
     @Override
@@ -266,7 +270,9 @@ public class Graph {
       for (Step step : path) {
         sb.append(step.getFullName() + "|");
       }
-      sb.deleteCharAt(sb.length() - 1);
+      if (path.size() > 0) {
+        sb.deleteCharAt(sb.length() - 1);
+      }
       return sb.toString();
     }
   }
@@ -276,16 +282,18 @@ public class Graph {
     abstract String getFullName();
     // TODO: remove public
     public abstract PTransform<?, ?> getTransform();
+    abstract WindowingStrategy<?, ?> getWindowingStrategy();
     abstract List<TupleTag<?>> getInputs();
     abstract List<TupleTag<?>> getOutputs();
 
     public static Step of(
         String fullName,
         PTransform<?, ?> transform,
+        WindowingStrategy<?, ?> windowingStrategy,
         List<TupleTag<?>> inputs,
         List<TupleTag<?>> outputs) {
       return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_Step(
-          fullName, transform, inputs, outputs);
+          fullName, transform, windowingStrategy, inputs, outputs);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
index bd56ac5..e7e7598 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -6,8 +6,13 @@ import java.util.Map;
 import org.apache.beam.runners.mapreduce.MapReduceRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
  * Pipeline translator for {@link MapReduceRunner}.
@@ -26,9 +31,12 @@ public class GraphConverter extends 
Pipeline.PipelineVisitor.Defaults {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    WindowingStrategy<?, ?> windowingStrategy =
+        getWindowingStrategy(node.getOutputs().values().iterator().next());
     Graph.Step step = Graph.Step.of(
         node.getFullName(),
         node.getTransform(),
+        windowingStrategy,
         ImmutableList.copyOf(node.getInputs().keySet()),
         ImmutableList.copyOf(node.getOutputs().keySet()));
     Graph.Vertex v = graph.addVertex(step);
@@ -37,7 +45,9 @@ public class GraphConverter extends 
Pipeline.PipelineVisitor.Defaults {
       TupleTag<?> tag = pValueToTupleTag.get(pValue);
       if (outputToProducer.containsKey(tag)) {
         Graph.Vertex producer = outputToProducer.get(tag);
-        graph.addEdge(producer, v);
+
+        PCollection<?> pc = (PCollection<?>) pValue;
+        graph.addEdge(producer, v, pc.getCoder());
       }
     }
 
@@ -47,6 +57,20 @@ public class GraphConverter extends 
Pipeline.PipelineVisitor.Defaults {
     }
   }
 
+  private WindowingStrategy<?, ?> getWindowingStrategy(PValue pValue) {
+    if (pValue instanceof PCollection) {
+      return ((PCollection) pValue).getWindowingStrategy();
+    } else if (pValue instanceof PCollectionList) {
+      return ((PCollectionList) pValue).get(0).getWindowingStrategy();
+    } else if (pValue instanceof PCollectionTuple) {
+      return ((PCollectionTuple) 
pValue).getAll().values().iterator().next().getWindowingStrategy();
+    } else if (pValue instanceof PCollectionView) {
+      return ((PCollectionView) 
pValue).getPCollection().getWindowingStrategy();
+    } else {
+      throw new RuntimeException("Unexpected pValue type: " + 
pValue.getClass());
+    }
+  }
+
   public Graph getGraph() {
     return graph;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index 9198b28..9ae8365 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -2,6 +2,8 @@ package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.beam.sdk.coders.Coder;
+
 /**
  * Created by peihe on 06/07/2017.
  */
@@ -18,6 +20,7 @@ public class GraphPlanner {
     private Graph fusedGraph;
     private Graph.Vertex workingVertex;
     private Graph.NodePath workingPath;
+    private Coder<?> workingEdgeCoder;
 
     FusionVisitor() {
       fusedGraph = new Graph();
@@ -33,7 +36,7 @@ public class GraphPlanner {
       }
       Graph.Vertex v = fusedGraph.addVertex(read.getStep());
       workingPath.addFirst(read.getStep());
-      Graph.Edge edge = fusedGraph.addEdge(v, workingVertex);
+      Graph.Edge edge = fusedGraph.addEdge(v, workingVertex, workingEdgeCoder);
       edge.addPath(workingPath);
     }
 
@@ -43,17 +46,20 @@ public class GraphPlanner {
       checkArgument(
           step.getTransform().getAdditionalInputs().isEmpty(),
           "Side inputs are not " + "supported.");
+      checkArgument(
+          parDo.getIncoming().size() == 1,
+          "Side inputs are not supported.");
+      Graph.Edge inEdge = parDo.getIncoming().iterator().next();
+
       if (workingVertex == null) {
         // Leaf vertex
         workingVertex = fusedGraph.addVertex(step);
         workingPath = new Graph.NodePath();
+        workingEdgeCoder = inEdge.getCoder();
       } else {
         workingPath.addFirst(step);
       }
-      checkArgument(
-          parDo.getIncoming().size() == 1,
-          "Side inputs are not supported.");
-      processParent(parDo.getIncoming().iterator().next().getHead());
+      processParent(inEdge.getHead());
     }
 
     @Override
@@ -66,6 +72,7 @@ public class GraphPlanner {
       for (Graph.Edge e : flatten.getIncoming()) {
         workingPath = new Graph.NodePath(basePath);
         workingVertex = baseVertex;
+        workingEdgeCoder = e.getCoder();
         processParent(e.getHead());
       }
     }
@@ -77,10 +84,17 @@ public class GraphPlanner {
       }
       Graph.Step step = groupByKey.getStep();
       Graph.Vertex addedGroupByKey = fusedGraph.addVertex(step);
-      Graph.Edge edge = fusedGraph.addEdge(addedGroupByKey, workingVertex);
+
+      Graph.Edge edge = fusedGraph.addEdge(
+          addedGroupByKey,
+          workingVertex,
+          workingEdgeCoder);
       edge.addPath(workingPath);
+      Graph.Edge inEdge = groupByKey.getIncoming().iterator().next();
       workingVertex = addedGroupByKey;
-      processParent(groupByKey.getIncoming().iterator().next().getHead());
+      workingPath = new Graph.NodePath();
+      workingEdgeCoder = inEdge.getCoder();
+      processParent(inEdge.getHead());
     }
 
     public Graph getFusedGraph() {

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
new file mode 100644
index 0000000..1da8d26
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
@@ -0,0 +1,38 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Created by peihe on 26/07/2017.
+ */
+public class GroupAlsoByWindowsParDoOperation extends ParDoOperation {
+
+  private final Coder<?> inputCoder;
+
+  public GroupAlsoByWindowsParDoOperation(
+      PipelineOptions options,
+      TupleTag<Object> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Coder<?> inputCoder) {
+    super(options, mainOutputTag, sideOutputTags, windowingStrategy);
+    this.inputCoder = checkNotNull(inputCoder, "inputCoder");
+  }
+
+  @Override
+  DoFn<Object, Object> getDoFn() {
+    return new GroupAlsoByWindowsViaOutputBufferDoFn(
+        windowingStrategy,
+        SystemReduceFn.buffering(inputCoder),
+        mainOutputTag,
+        createOutputManager());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
new file mode 100644
index 0000000..0b8a876
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -0,0 +1,120 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
+
+/**
+ * The default batch implementation, if no specialized "fast path" 
implementation is applicable.
+ */
+public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W 
extends BoundedWindow>
+    extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
+
+  private final WindowingStrategy<Object, W> windowingStrategy;
+  private final SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+  private final TupleTag<KV<K, OutputT>> mainTag;
+  private transient DoFnRunners.OutputManager outputManager;
+
+  public GroupAlsoByWindowsViaOutputBufferDoFn(
+      WindowingStrategy<Object, W> windowingStrategy,
+      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
+      TupleTag<KV<K, OutputT>> mainTag,
+      DoFnRunners.OutputManager outputManager) {
+    this.windowingStrategy = checkNotNull(windowingStrategy, 
"windowingStrategy");
+    this.reduceFn = checkNotNull(reduceFn, "reduceFn");
+    this.mainTag = checkNotNull(mainTag, "mainTag");
+    this.outputManager = checkNotNull(outputManager, "outputManager");
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    K key = c.element().getKey();
+    // Used with Batch, we know that all the data is available for this key. 
We can't use the
+    // timer manager from the context because it doesn't exist. So we create 
one and emulate the
+    // watermark, knowing that we have all data and it is in timestamp order.
+    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    ReduceFnRunner<K, InputT, OutputT, W> runner = new ReduceFnRunner<>(
+        key,
+        windowingStrategy,
+        ExecutableTriggerStateMachine.create(
+            TriggerStateMachines.stateMachineForTrigger(
+                TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
+        InMemoryStateInternals.forKey(key),
+        timerInternals,
+        outputWindowedValue(),
+        NullSideInputReader.empty(),
+        reduceFn,
+        c.getPipelineOptions());
+
+    Iterable<List<WindowedValue<InputT>>> chunks =
+        Iterables.partition(c.element().getValue(), 1000);
+    for (Iterable<WindowedValue<InputT>> chunk : chunks) {
+      // Process the chunk of elements.
+      runner.processElements(chunk);
+
+      // Then, since elements are sorted by their timestamp, advance the input 
watermark
+      // to the first element, and fire any timers that may have been 
scheduled.
+      // TODO: re-enable once elements are sorted.
+      // 
timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp());
+
+      // Fire any processing timers that need to fire
+      timerInternals.advanceProcessingTime(Instant.now());
+
+      // Leave the output watermark undefined. Since there's no late data in 
batch mode
+      // there's really no need to track it as we do for streaming.
+    }
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    runner.persist();
+  }
+
+  private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
+    return new OutputWindowedValue<KV<K, OutputT>>() {
+      @Override
+      public void outputWindowedValue(
+          KV<K, OutputT> output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        outputManager.output(mainTag,
+            WindowedValue.of(output, timestamp, windows, pane));
+      }
+
+      @Override
+      public <AdditionalOutputT> void outputWindowedValue(
+          TupleTag<AdditionalOutputT> tag,
+          AdditionalOutputT output,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        outputManager.output(tag,
+            WindowedValue.of(output, timestamp, windows, pane));
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 7cdf697..34266f4 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -1,6 +1,5 @@
 package org.apache.beam.runners.mapreduce.translation;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -16,13 +15,14 @@ import java.util.Set;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 
@@ -49,10 +49,15 @@ public class JobPrototype {
     Job job = new Job(conf);
     conf = job.getConfiguration();
     job.setJarByClass(jarClass);
+    conf.set(
+        "io.serializations",
+        "org.apache.hadoop.io.serializer.WritableSerialization," +
+            "org.apache.hadoop.io.serializer.JavaSerialization");
 
     // Setup BoundedSources in BeamInputFormat.
-    // TODO: support more than one inputs
-    Graph.Vertex head = 
Iterables.getOnlyElement(vertex.getIncoming()).getHead();
+    // TODO: support more than one in-edge
+    Graph.Edge inEdge = Iterables.getOnlyElement(vertex.getIncoming());
+    Graph.Vertex head = inEdge.getHead();
     Graph.Step headStep = head.getStep();
     checkState(headStep.getTransform() instanceof Read.Bounded);
     Read.Bounded read = (Read.Bounded) headStep.getTransform();
@@ -62,11 +67,10 @@ public class JobPrototype {
     job.setInputFormatClass(BeamInputFormat.class);
 
     // Setup DoFns in BeamMapper.
-    // TODO: support more than one out going edge.
-    Graph.Edge outEdge = Iterables.getOnlyElement(head.getOutgoing());
-    Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths());
+    // TODO: support more than one in-path.
+    Graph.NodePath inPath = Iterables.getOnlyElement(inEdge.getPaths());
     List<Graph.Step> parDos = new ArrayList<>();
-    parDos.addAll(FluentIterable.from(outPath.steps())
+    parDos.addAll(FluentIterable.from(inPath.steps())
         .filter(new Predicate<Graph.Step>() {
           @Override
           public boolean apply(Graph.Step input) {
@@ -84,12 +88,12 @@ public class JobPrototype {
     ParDoOperation root = null;
     ParDoOperation prev = null;
     for (Graph.Step step : parDos) {
-      ParDoOperation current = new ParDoOperation(
+      ParDoOperation current = new NormalParDoOperation(
           getDoFn(step.getTransform()),
           PipelineOptionsFactory.create(),
           (TupleTag<Object>) step.getOutputs().iterator().next(),
           ImmutableList.<TupleTag<?>>of(),
-          WindowingStrategy.globalDefault());
+          step.getWindowingStrategy());
       if (root == null) {
         root = current;
       } else {
@@ -98,10 +102,30 @@ public class JobPrototype {
       }
       prev = current;
     }
+    // TODO: get coders from pipeline.
+    WriteOperation writeOperation = new WriteOperation(inEdge.getCoder());
+    writeOperation.attachInput(prev, 0);
     conf.set(
-        BeamMapper.BEAM_SERIALIZED_PAR_DO_OPERATION,
+        BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER,
         
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(root)));
     job.setMapperClass(BeamMapper.class);
+
+    if (vertexStep.getTransform() instanceof GroupByKey) {
+      // Setup BeamReducer
+      ParDoOperation operation = new GroupAlsoByWindowsParDoOperation(
+          PipelineOptionsFactory.create(),
+          (TupleTag<Object>) vertexStep.getOutputs().iterator().next(),
+          ImmutableList.<TupleTag<?>>of(),
+          vertexStep.getWindowingStrategy(),
+          inEdge.getCoder());
+      // TODO: handle the map output key type.
+      job.setMapOutputKeyClass(BytesWritable.class);
+      job.setMapOutputValueClass(byte[].class);
+      conf.set(
+          BeamReducer.BEAM_PAR_DO_OPERATION_REDUCER,
+          
Base64.encodeBase64String(SerializableUtils.serializeToByteArray(operation)));
+      job.setReducerClass(BeamReducer.class);
+    }
     job.setOutputFormatClass(NullOutputFormat.class);
     return job;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
new file mode 100644
index 0000000..1da39a9
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Created by peihe on 26/07/2017.
+ */
+public class NormalParDoOperation extends ParDoOperation {
+
+  private final DoFn<Object, Object> doFn;
+
+  public NormalParDoOperation(
+      DoFn<Object, Object> doFn,
+      PipelineOptions options,
+      TupleTag<Object> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      WindowingStrategy<?, ?> windowingStrategy) {
+    super(options, mainOutputTag, sideOutputTags, windowingStrategy);
+    this.doFn = checkNotNull(doFn, "doFn");
+  }
+
+  @Override
+  DoFn<Object, Object> getDoFn() {
+    return doFn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
new file mode 100644
index 0000000..5700e89
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -0,0 +1,69 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Created by peihe on 26/07/2017.
+ */
+public abstract class Operation implements Serializable {
+  private final OutputReceiver[] receivers;
+
+  public Operation(int numOutputs) {
+    this.receivers = new OutputReceiver[numOutputs];
+    for (int i = 0; i < numOutputs; ++i) {
+      receivers[i] = new OutputReceiver();
+    }
+  }
+
+  /**
+   * Starts this Operation's execution.
+   *
+   * <p>Called after all successors consuming operations have been started.
+   */
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
+    for (OutputReceiver receiver : receivers) {
+      if (receiver == null) {
+        continue;
+      }
+      for (Operation operation : receiver.getReceivingOperations()) {
+        operation.start(taskContext);
+      }
+    }
+  }
+
+  /**
+   * Processes the element.
+   */
+  public abstract void process(Object elem);
+
+  /**
+   * Finishes this Operation's execution.
+   *
+   * <p>Called after all predecessors producing operations have been finished.
+   */
+  public void finish() {
+    for (OutputReceiver receiver : receivers) {
+      if (receiver == null) {
+        continue;
+      }
+      for (Operation operation : receiver.getReceivingOperations()) {
+        operation.finish();
+      }
+    }
+  }
+
+  public List<OutputReceiver> getOutputReceivers() {
+    return ImmutableList.copyOf(receivers);
+  }
+
+  /**
+   * Adds an input to this ParDoOperation, coming from the given output of the 
given source.
+   */
+  public void attachInput(Operation source, int outputNum) {
+    OutputReceiver fanOut = source.receivers[outputNum];
+    fanOut.addOutput(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
index 59cee3c..3347672 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
@@ -27,24 +27,24 @@ import java.util.List;
  * ParDoOperations.
  */
 public class OutputReceiver implements Serializable {
-  private final List<ParDoOperation> receiverParDos = new ArrayList<>();
+  private final List<Operation> receivingOperations = new ArrayList<>();
 
   /**
    * Adds a new receiver that this OutputReceiver forwards to.
    */
-  public void addOutput(ParDoOperation receiver) {
-    receiverParDos.add(receiver);
+  public void addOutput(Operation receiver) {
+    receivingOperations.add(receiver);
   }
 
-  public List<ParDoOperation> getReceiverParDos() {
-    return ImmutableList.copyOf(receiverParDos);
+  public List<Operation> getReceivingOperations() {
+    return ImmutableList.copyOf(receivingOperations);
   }
 
   /**
    * Processes the element.
    */
   public void process(Object elem) {
-    for (ParDoOperation out : receiverParDos) {
+    for (Operation out : receivingOperations) {
       if (out != null) {
         out.process(elem);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index 97473bb..2627d20 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.io.Serializable;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
@@ -30,109 +29,83 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Operation for ParDo.
  */
-public class ParDoOperation implements Serializable {
+public abstract class ParDoOperation extends Operation {
   private static final Logger LOG = 
LoggerFactory.getLogger(ParDoOperation.class);
 
-  private final DoFn<Object, Object> doFn;
-  private final SerializedPipelineOptions options;
-  private final TupleTag<Object> mainOutputTag;
+  protected final SerializedPipelineOptions options;
+  protected final TupleTag<Object> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-  private final OutputReceiver[] receivers;
+  protected final WindowingStrategy<?, ?> windowingStrategy;
 
   private DoFnRunner<Object, Object> fnRunner;
 
   public ParDoOperation(
-      DoFn<Object, Object> doFn,
       PipelineOptions options,
       TupleTag<Object> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       WindowingStrategy<?, ?> windowingStrategy) {
-    this.doFn = checkNotNull(doFn, "doFn");
+    super(1 + sideOutputTags.size());
     this.options = new SerializedPipelineOptions(checkNotNull(options, 
"options"));
     this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag");
     this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags");
     this.windowingStrategy = checkNotNull(windowingStrategy, 
"windowingStrategy");
-    int numOutputs = 1 + sideOutputTags.size();
-    this.receivers = new OutputReceiver[numOutputs];
-    for (int i = 0; i < numOutputs; ++i) {
-      receivers[i] = new OutputReceiver();
-    }
   }
 
   /**
-   * Adds an input to this ParDoOperation, coming from the given output of the 
given source.
+   * Returns a {@link DoFn} for processing inputs.
    */
-  public void attachInput(ParDoOperation source, int outputNum) {
-    OutputReceiver fanOut = source.receivers[outputNum];
-    fanOut.addOutput(this);
-  }
+  abstract DoFn<Object, Object> getDoFn();
 
-  /**
-   * Starts this Operation's execution.
-   *
-   * <p>Called after all successors consuming operations have been started.
-   */
-  public void start() {
+  @Override
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
     fnRunner = DoFnRunners.simpleRunner(
         options.getPipelineOptions(),
-        doFn,
+        getDoFn(),
         NullSideInputReader.empty(),
-        new ParDoOutputManager(),
+        createOutputManager(),
         mainOutputTag,
         sideOutputTags,
         null,
         windowingStrategy);
     fnRunner.startBundle();
-    for (OutputReceiver receiver : receivers) {
-      if (receiver == null) {
-        continue;
-      }
-      for (ParDoOperation parDo : receiver.getReceiverParDos()) {
-        parDo.start();
-      }
-    }
+    super.start(taskContext);
   }
 
   /**
    * Processes the element.
    */
+  @Override
   public void process(Object elem) {
     LOG.info("elem: {}.", elem);
     fnRunner.processElement((WindowedValue<Object>) elem);
   }
 
-  /**
-   * Finishes this Operation's execution.
-   *
-   * <p>Called after all predecessors producing operations have been finished.
-   */
+  @Override
   public void finish() {
-    for (OutputReceiver receiver : receivers) {
-      if (receiver == null) {
-        continue;
-      }
-      for (ParDoOperation parDo : receiver.getReceiverParDos()) {
-        parDo.finish();
-      }
-    }
+    super.finish();
     fnRunner.finishBundle();
   }
 
+  protected DoFnRunners.OutputManager createOutputManager() {
+    return new ParDoOutputManager();
+  }
+
   private class ParDoOutputManager implements DoFnRunners.OutputManager {
 
     @Nullable
     private OutputReceiver getReceiverOrNull(TupleTag<?> tag) {
+      List<OutputReceiver> receivers = getOutputReceivers();
       if (tag.equals(mainOutputTag)) {
-        return receivers[0];
+        return receivers.get(0);
       } else if (sideOutputTags.contains(tag)) {
-        return receivers[sideOutputTags.indexOf(tag) + 1];
+        return receivers.get(sideOutputTags.indexOf(tag) + 1);
       } else {
         return null;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
new file mode 100644
index 0000000..97201d0
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
@@ -0,0 +1,52 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.io.ByteArrayOutputStream;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Created by peihe on 26/07/2017.
+ */
+public class WriteOperation extends Operation {
+
+  private final Coder<Object> keyCoder;
+  private final Coder<Object> nullableValueCoder;
+
+  private transient TaskInputOutputContext<Object, Object, Object, Object> 
taskContext;
+
+  public WriteOperation(Coder<?> coder) {
+    super(0);
+    KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) 
checkNotNull(coder, "coder");
+    this.keyCoder = kvCoder.getKeyCoder();
+    this.nullableValueCoder = NullableCoder.of(kvCoder.getValueCoder());
+  }
+
+  @Override
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> 
taskContext) {
+    this.taskContext = checkNotNull(taskContext, "taskContext");
+  }
+
+  @Override
+  public void process(Object elem) {
+    WindowedValue<KV<?, ?>> windowedElem = (WindowedValue<KV<?, ?>>) elem;
+    try {
+      ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
+      keyCoder.encode(windowedElem.getValue().getKey(), keyStream);
+
+      ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
+      nullableValueCoder.encode(windowedElem.getValue().getValue(), 
valueStream);
+      taskContext.write(new BytesWritable(keyStream.toByteArray()), 
valueStream.toByteArray());
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
index f5eee28..5fa499a 100644
--- 
a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
+++ 
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
@@ -4,19 +4,12 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.log4j.BasicConfigurator;
 import org.junit.Test;
 import org.junit.runner.RunWith;

Reply via email to