mr-runner: add BeamReducer and support GroupByKey.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/923190dc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/923190dc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/923190dc Branch: refs/heads/mr-runner Commit: 923190dca2426711e30e5c5fe7093e14fcbefe07 Parents: 389b02b Author: Pei He <p...@apache.org> Authored: Wed Jul 26 21:19:30 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:47 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/translation/BeamMapper.java | 13 +- .../mapreduce/translation/BeamReducer.java | 68 +++++++++++ .../runners/mapreduce/translation/Graph.java | 36 +++--- .../mapreduce/translation/GraphConverter.java | 26 +++- .../mapreduce/translation/GraphPlanner.java | 28 +++-- .../GroupAlsoByWindowsParDoOperation.java | 38 ++++++ .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 120 +++++++++++++++++++ .../mapreduce/translation/JobPrototype.java | 46 +++++-- .../translation/NormalParDoOperation.java | 49 ++++++++ .../mapreduce/translation/Operation.java | 69 +++++++++++ .../mapreduce/translation/OutputReceiver.java | 12 +- .../mapreduce/translation/ParDoOperation.java | 73 ++++------- .../mapreduce/translation/WriteOperation.java | 52 ++++++++ .../beam/runners/mapreduce/WordCountTest.java | 7 -- 14 files changed, 534 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java index b74797d..11ecc8d 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java @@ -6,8 +6,7 @@ import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.mapreduce.Mapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; /** * Created by peihe on 21/07/2017. @@ -15,7 +14,7 @@ import org.slf4j.LoggerFactory; public class BeamMapper<ValueInT, ValueOutT> extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>> { - public static final String BEAM_SERIALIZED_PAR_DO_OPERATION = "beam-serialized-par-do-op"; + public static final String BEAM_PAR_DO_OPERATION_MAPPER = "beam-par-do-op-mapper"; private ParDoOperation parDoOperation; @@ -23,11 +22,11 @@ public class BeamMapper<ValueInT, ValueOutT> protected void setup( Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { String serializedParDo = checkNotNull( - context.getConfiguration().get(BEAM_SERIALIZED_PAR_DO_OPERATION), - BEAM_SERIALIZED_PAR_DO_OPERATION); + context.getConfiguration().get(BEAM_PAR_DO_OPERATION_MAPPER), + BEAM_PAR_DO_OPERATION_MAPPER); parDoOperation = (ParDoOperation) SerializableUtils.deserializeFromByteArray( - Base64.decodeBase64(serializedParDo), "DoFn"); - parDoOperation.start(); + Base64.decodeBase64(serializedParDo), "ParDoOperation"); + parDoOperation.start((TaskInputOutputContext) context); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java new file mode 100644 index 0000000..8eb7938 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java @@ -0,0 +1,68 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.FluentIterable; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +/** + * Created by peihe on 25/07/2017. + */ +public class BeamReducer<ValueInT, ValueOutT> + extends Reducer<Object, byte[], Object, WindowedValue<ValueOutT>> { + + public static final String BEAM_PAR_DO_OPERATION_REDUCER = "beam-par-do-op-reducer"; + + private ParDoOperation parDoOperation; + + @Override + protected void setup( + Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context context) { + String serializedParDo = checkNotNull( + context.getConfiguration().get(BEAM_PAR_DO_OPERATION_REDUCER), + BEAM_PAR_DO_OPERATION_REDUCER); + parDoOperation = (ParDoOperation) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(serializedParDo), "ParDoOperation"); + parDoOperation.start((TaskInputOutputContext) context); + } + + @Override + protected void reduce( + Object key, + Iterable<byte[]> values, + Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context context) { + Iterable<Object> decodedValues = FluentIterable.from(values) + .transform(new Function<byte[], Object>() { + @Override + public Object apply(byte[] input) { + ByteArrayInputStream inStream = new ByteArrayInputStream(input); + try { + // TODO: setup coders. + return NullableCoder.of(BigEndianLongCoder.of()).decode(inStream); + } catch (IOException e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + }); + parDoOperation.process( + WindowedValue.valueInGlobalWindow(KV.of(key, decodedValues))); + } + + @Override + protected void cleanup( + Reducer<Object, byte[], Object, WindowedValue<ValueOutT>>.Context context) { + parDoOperation.finish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java index 6ea774b..867d1af 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -12,13 +12,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; @@ -45,10 +46,10 @@ public class Graph { return v; } - public Edge addEdge(Vertex head, Vertex tail) { + public Edge addEdge(Vertex head, Vertex tail, Coder<?> coder) { HeadTail headTail = HeadTail.of(head, tail); checkState(!edges.containsKey(headTail)); - Edge e = new Edge(headTail); + Edge e = new Edge(headTail, coder); edges.put(headTail, e); head.addOutgoing(e); tail.addIncoming(e); @@ -166,18 +167,16 @@ public class Graph { public static class Edge { private final HeadTail headTail; + private final Coder<?> coder; private final Set<NodePath> paths; - public static Edge of(Vertex head, Vertex tail) { - return of(HeadTail.of(head, tail)); + public static Edge of(HeadTail headTail, Coder<?> coder) { + return new Edge(headTail, coder); } - public static Edge of(HeadTail headTail) { - return new Edge(headTail); - } - - private Edge(HeadTail headTail) { + private Edge(HeadTail headTail, Coder<?> coder) { this.headTail = checkNotNull(headTail, "headTail"); + this.coder = checkNotNull(coder, "coder"); this.paths = Sets.newHashSet(); } @@ -189,6 +188,10 @@ public class Graph { return headTail.getTail(); } + public Coder<?> getCoder() { + return coder; + } + public Set<NodePath> getPaths() { return paths; } @@ -204,14 +207,15 @@ public class Graph { } if (obj instanceof Edge) { Edge other = (Edge) obj; - return headTail.equals(other.headTail) && paths.equals(paths); + return headTail.equals(other.headTail) + && paths.equals(other.paths) && coder.equals(other.coder); } return false; } @Override public int hashCode() { - return Objects.hash(headTail, paths); + return Objects.hash(headTail, paths, coder); } @Override @@ -266,7 +270,9 @@ public class Graph { for (Step step : path) { sb.append(step.getFullName() + "|"); } - sb.deleteCharAt(sb.length() - 1); + if (path.size() > 0) { + sb.deleteCharAt(sb.length() - 1); + } return sb.toString(); } } @@ -276,16 +282,18 @@ public class Graph { abstract String getFullName(); // TODO: remove public public abstract PTransform<?, ?> getTransform(); + abstract WindowingStrategy<?, ?> getWindowingStrategy(); abstract List<TupleTag<?>> getInputs(); abstract List<TupleTag<?>> getOutputs(); public static Step of( String fullName, PTransform<?, ?> transform, + WindowingStrategy<?, ?> windowingStrategy, List<TupleTag<?>> inputs, List<TupleTag<?>> outputs) { return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_Step( - fullName, transform, inputs, outputs); + fullName, transform, windowingStrategy, inputs, outputs); } } http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java index bd56ac5..e7e7598 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java @@ -6,8 +6,13 @@ import java.util.Map; import org.apache.beam.runners.mapreduce.MapReduceRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; /** * Pipeline translator for {@link MapReduceRunner}. @@ -26,9 +31,12 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { + WindowingStrategy<?, ?> windowingStrategy = + getWindowingStrategy(node.getOutputs().values().iterator().next()); Graph.Step step = Graph.Step.of( node.getFullName(), node.getTransform(), + windowingStrategy, ImmutableList.copyOf(node.getInputs().keySet()), ImmutableList.copyOf(node.getOutputs().keySet())); Graph.Vertex v = graph.addVertex(step); @@ -37,7 +45,9 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { TupleTag<?> tag = pValueToTupleTag.get(pValue); if (outputToProducer.containsKey(tag)) { Graph.Vertex producer = outputToProducer.get(tag); - graph.addEdge(producer, v); + + PCollection<?> pc = (PCollection<?>) pValue; + graph.addEdge(producer, v, pc.getCoder()); } } @@ -47,6 +57,20 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { } } + private WindowingStrategy<?, ?> getWindowingStrategy(PValue pValue) { + if (pValue instanceof PCollection) { + return ((PCollection) pValue).getWindowingStrategy(); + } else if (pValue instanceof PCollectionList) { + return ((PCollectionList) pValue).get(0).getWindowingStrategy(); + } else if (pValue instanceof PCollectionTuple) { + return ((PCollectionTuple) pValue).getAll().values().iterator().next().getWindowingStrategy(); + } else if (pValue instanceof PCollectionView) { + return ((PCollectionView) pValue).getPCollection().getWindowingStrategy(); + } else { + throw new RuntimeException("Unexpected pValue type: " + pValue.getClass()); + } + } + public Graph getGraph() { return graph; } http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index 9198b28..9ae8365 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -2,6 +2,8 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.sdk.coders.Coder; + /** * Created by peihe on 06/07/2017. */ @@ -18,6 +20,7 @@ public class GraphPlanner { private Graph fusedGraph; private Graph.Vertex workingVertex; private Graph.NodePath workingPath; + private Coder<?> workingEdgeCoder; FusionVisitor() { fusedGraph = new Graph(); @@ -33,7 +36,7 @@ public class GraphPlanner { } Graph.Vertex v = fusedGraph.addVertex(read.getStep()); workingPath.addFirst(read.getStep()); - Graph.Edge edge = fusedGraph.addEdge(v, workingVertex); + Graph.Edge edge = fusedGraph.addEdge(v, workingVertex, workingEdgeCoder); edge.addPath(workingPath); } @@ -43,17 +46,20 @@ public class GraphPlanner { checkArgument( step.getTransform().getAdditionalInputs().isEmpty(), "Side inputs are not " + "supported."); + checkArgument( + parDo.getIncoming().size() == 1, + "Side inputs are not supported."); + Graph.Edge inEdge = parDo.getIncoming().iterator().next(); + if (workingVertex == null) { // Leaf vertex workingVertex = fusedGraph.addVertex(step); workingPath = new Graph.NodePath(); + workingEdgeCoder = inEdge.getCoder(); } else { workingPath.addFirst(step); } - checkArgument( - parDo.getIncoming().size() == 1, - "Side inputs are not supported."); - processParent(parDo.getIncoming().iterator().next().getHead()); + processParent(inEdge.getHead()); } @Override @@ -66,6 +72,7 @@ public class GraphPlanner { for (Graph.Edge e : flatten.getIncoming()) { workingPath = new Graph.NodePath(basePath); workingVertex = baseVertex; + workingEdgeCoder = e.getCoder(); processParent(e.getHead()); } } @@ -77,10 +84,17 @@ public class GraphPlanner { } Graph.Step step = groupByKey.getStep(); Graph.Vertex addedGroupByKey = fusedGraph.addVertex(step); - Graph.Edge edge = fusedGraph.addEdge(addedGroupByKey, workingVertex); + + Graph.Edge edge = fusedGraph.addEdge( + addedGroupByKey, + workingVertex, + workingEdgeCoder); edge.addPath(workingPath); + Graph.Edge inEdge = groupByKey.getIncoming().iterator().next(); workingVertex = addedGroupByKey; - processParent(groupByKey.getIncoming().iterator().next().getHead()); + workingPath = new Graph.NodePath(); + workingEdgeCoder = inEdge.getCoder(); + processParent(inEdge.getHead()); } public Graph getFusedGraph() { http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java new file mode 100644 index 0000000..1da8d26 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java @@ -0,0 +1,38 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * Created by peihe on 26/07/2017. + */ +public class GroupAlsoByWindowsParDoOperation extends ParDoOperation { + + private final Coder<?> inputCoder; + + public GroupAlsoByWindowsParDoOperation( + PipelineOptions options, + TupleTag<Object> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + WindowingStrategy<?, ?> windowingStrategy, + Coder<?> inputCoder) { + super(options, mainOutputTag, sideOutputTags, windowingStrategy); + this.inputCoder = checkNotNull(inputCoder, "inputCoder"); + } + + @Override + DoFn<Object, Object> getDoFn() { + return new GroupAlsoByWindowsViaOutputBufferDoFn( + windowingStrategy, + SystemReduceFn.buffering(inputCoder), + mainOutputTag, + createOutputManager()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java new file mode 100644 index 0000000..0b8a876 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -0,0 +1,120 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Iterables; +import java.util.Collection; +import java.util.List; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.InMemoryStateInternals; +import org.apache.beam.runners.core.InMemoryTimerInternals; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.OutputWindowedValue; +import org.apache.beam.runners.core.ReduceFnRunner; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.construction.TriggerTranslation; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.joda.time.Instant; + +/** + * The default batch implementation, if no specialized "fast path" implementation is applicable. + */ +public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow> + extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> { + + private final WindowingStrategy<Object, W> windowingStrategy; + private final SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn; + private final TupleTag<KV<K, OutputT>> mainTag; + private transient DoFnRunners.OutputManager outputManager; + + public GroupAlsoByWindowsViaOutputBufferDoFn( + WindowingStrategy<Object, W> windowingStrategy, + SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn, + TupleTag<KV<K, OutputT>> mainTag, + DoFnRunners.OutputManager outputManager) { + this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy"); + this.reduceFn = checkNotNull(reduceFn, "reduceFn"); + this.mainTag = checkNotNull(mainTag, "mainTag"); + this.outputManager = checkNotNull(outputManager, "outputManager"); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + K key = c.element().getKey(); + // Used with Batch, we know that all the data is available for this key. We can't use the + // timer manager from the context because it doesn't exist. So we create one and emulate the + // watermark, knowing that we have all data and it is in timestamp order. + InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + ReduceFnRunner<K, InputT, OutputT, W> runner = new ReduceFnRunner<>( + key, + windowingStrategy, + ExecutableTriggerStateMachine.create( + TriggerStateMachines.stateMachineForTrigger( + TriggerTranslation.toProto(windowingStrategy.getTrigger()))), + InMemoryStateInternals.forKey(key), + timerInternals, + outputWindowedValue(), + NullSideInputReader.empty(), + reduceFn, + c.getPipelineOptions()); + + Iterable<List<WindowedValue<InputT>>> chunks = + Iterables.partition(c.element().getValue(), 1000); + for (Iterable<WindowedValue<InputT>> chunk : chunks) { + // Process the chunk of elements. + runner.processElements(chunk); + + // Then, since elements are sorted by their timestamp, advance the input watermark + // to the first element, and fire any timers that may have been scheduled. + // TODO: re-enable once elements are sorted. + // timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp()); + + // Fire any processing timers that need to fire + timerInternals.advanceProcessingTime(Instant.now()); + + // Leave the output watermark undefined. Since there's no late data in batch mode + // there's really no need to track it as we do for streaming. + } + + // Finish any pending windows by advancing the input watermark to infinity. + timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + + // Finally, advance the processing time to infinity to fire any timers. + timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + + runner.persist(); + } + + private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() { + return new OutputWindowedValue<KV<K, OutputT>>() { + @Override + public void outputWindowedValue( + KV<K, OutputT> output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output(mainTag, + WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <AdditionalOutputT> void outputWindowedValue( + TupleTag<AdditionalOutputT> tag, + AdditionalOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output(tag, + WindowedValue.of(output, timestamp, windows, pane)); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index 7cdf697..34266f4 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -1,6 +1,5 @@ package org.apache.beam.runners.mapreduce.translation; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -16,13 +15,14 @@ import java.util.Set; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; @@ -49,10 +49,15 @@ public class JobPrototype { Job job = new Job(conf); conf = job.getConfiguration(); job.setJarByClass(jarClass); + conf.set( + "io.serializations", + "org.apache.hadoop.io.serializer.WritableSerialization," + + "org.apache.hadoop.io.serializer.JavaSerialization"); // Setup BoundedSources in BeamInputFormat. - // TODO: support more than one inputs - Graph.Vertex head = Iterables.getOnlyElement(vertex.getIncoming()).getHead(); + // TODO: support more than one in-edge + Graph.Edge inEdge = Iterables.getOnlyElement(vertex.getIncoming()); + Graph.Vertex head = inEdge.getHead(); Graph.Step headStep = head.getStep(); checkState(headStep.getTransform() instanceof Read.Bounded); Read.Bounded read = (Read.Bounded) headStep.getTransform(); @@ -62,11 +67,10 @@ public class JobPrototype { job.setInputFormatClass(BeamInputFormat.class); // Setup DoFns in BeamMapper. - // TODO: support more than one out going edge. - Graph.Edge outEdge = Iterables.getOnlyElement(head.getOutgoing()); - Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths()); + // TODO: support more than one in-path. + Graph.NodePath inPath = Iterables.getOnlyElement(inEdge.getPaths()); List<Graph.Step> parDos = new ArrayList<>(); - parDos.addAll(FluentIterable.from(outPath.steps()) + parDos.addAll(FluentIterable.from(inPath.steps()) .filter(new Predicate<Graph.Step>() { @Override public boolean apply(Graph.Step input) { @@ -84,12 +88,12 @@ public class JobPrototype { ParDoOperation root = null; ParDoOperation prev = null; for (Graph.Step step : parDos) { - ParDoOperation current = new ParDoOperation( + ParDoOperation current = new NormalParDoOperation( getDoFn(step.getTransform()), PipelineOptionsFactory.create(), (TupleTag<Object>) step.getOutputs().iterator().next(), ImmutableList.<TupleTag<?>>of(), - WindowingStrategy.globalDefault()); + step.getWindowingStrategy()); if (root == null) { root = current; } else { @@ -98,10 +102,30 @@ public class JobPrototype { } prev = current; } + // TODO: get coders from pipeline. + WriteOperation writeOperation = new WriteOperation(inEdge.getCoder()); + writeOperation.attachInput(prev, 0); conf.set( - BeamMapper.BEAM_SERIALIZED_PAR_DO_OPERATION, + BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER, Base64.encodeBase64String(SerializableUtils.serializeToByteArray(root))); job.setMapperClass(BeamMapper.class); + + if (vertexStep.getTransform() instanceof GroupByKey) { + // Setup BeamReducer + ParDoOperation operation = new GroupAlsoByWindowsParDoOperation( + PipelineOptionsFactory.create(), + (TupleTag<Object>) vertexStep.getOutputs().iterator().next(), + ImmutableList.<TupleTag<?>>of(), + vertexStep.getWindowingStrategy(), + inEdge.getCoder()); + // TODO: handle the map output key type. + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(byte[].class); + conf.set( + BeamReducer.BEAM_PAR_DO_OPERATION_REDUCER, + Base64.encodeBase64String(SerializableUtils.serializeToByteArray(operation))); + job.setReducerClass(BeamReducer.class); + } job.setOutputFormatClass(NullOutputFormat.class); return job; } http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java new file mode 100644 index 0000000..1da39a9 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * Created by peihe on 26/07/2017. + */ +public class NormalParDoOperation extends ParDoOperation { + + private final DoFn<Object, Object> doFn; + + public NormalParDoOperation( + DoFn<Object, Object> doFn, + PipelineOptions options, + TupleTag<Object> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + WindowingStrategy<?, ?> windowingStrategy) { + super(options, mainOutputTag, sideOutputTags, windowingStrategy); + this.doFn = checkNotNull(doFn, "doFn"); + } + + @Override + DoFn<Object, Object> getDoFn() { + return doFn; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java new file mode 100644 index 0000000..5700e89 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java @@ -0,0 +1,69 @@ +package org.apache.beam.runners.mapreduce.translation; + +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.List; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +/** + * Created by peihe on 26/07/2017. + */ +public abstract class Operation implements Serializable { + private final OutputReceiver[] receivers; + + public Operation(int numOutputs) { + this.receivers = new OutputReceiver[numOutputs]; + for (int i = 0; i < numOutputs; ++i) { + receivers[i] = new OutputReceiver(); + } + } + + /** + * Starts this Operation's execution. + * + * <p>Called after all successors consuming operations have been started. + */ + public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { + for (OutputReceiver receiver : receivers) { + if (receiver == null) { + continue; + } + for (Operation operation : receiver.getReceivingOperations()) { + operation.start(taskContext); + } + } + } + + /** + * Processes the element. + */ + public abstract void process(Object elem); + + /** + * Finishes this Operation's execution. + * + * <p>Called after all predecessors producing operations have been finished. + */ + public void finish() { + for (OutputReceiver receiver : receivers) { + if (receiver == null) { + continue; + } + for (Operation operation : receiver.getReceivingOperations()) { + operation.finish(); + } + } + } + + public List<OutputReceiver> getOutputReceivers() { + return ImmutableList.copyOf(receivers); + } + + /** + * Adds an input to this ParDoOperation, coming from the given output of the given source. + */ + public void attachInput(Operation source, int outputNum) { + OutputReceiver fanOut = source.receivers[outputNum]; + fanOut.addOutput(this); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java index 59cee3c..3347672 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java @@ -27,24 +27,24 @@ import java.util.List; * ParDoOperations. */ public class OutputReceiver implements Serializable { - private final List<ParDoOperation> receiverParDos = new ArrayList<>(); + private final List<Operation> receivingOperations = new ArrayList<>(); /** * Adds a new receiver that this OutputReceiver forwards to. */ - public void addOutput(ParDoOperation receiver) { - receiverParDos.add(receiver); + public void addOutput(Operation receiver) { + receivingOperations.add(receiver); } - public List<ParDoOperation> getReceiverParDos() { - return ImmutableList.copyOf(receiverParDos); + public List<Operation> getReceivingOperations() { + return ImmutableList.copyOf(receivingOperations); } /** * Processes the element. */ public void process(Object elem) { - for (ParDoOperation out : receiverParDos) { + for (Operation out : receivingOperations) { if (out != null) { out.process(elem); } http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java index 97473bb..2627d20 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; -import java.io.Serializable; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunner; @@ -30,109 +29,83 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Operation for ParDo. */ -public class ParDoOperation implements Serializable { +public abstract class ParDoOperation extends Operation { private static final Logger LOG = LoggerFactory.getLogger(ParDoOperation.class); - private final DoFn<Object, Object> doFn; - private final SerializedPipelineOptions options; - private final TupleTag<Object> mainOutputTag; + protected final SerializedPipelineOptions options; + protected final TupleTag<Object> mainOutputTag; private final List<TupleTag<?>> sideOutputTags; - private final WindowingStrategy<?, ?> windowingStrategy; - private final OutputReceiver[] receivers; + protected final WindowingStrategy<?, ?> windowingStrategy; private DoFnRunner<Object, Object> fnRunner; public ParDoOperation( - DoFn<Object, Object> doFn, PipelineOptions options, TupleTag<Object> mainOutputTag, List<TupleTag<?>> sideOutputTags, WindowingStrategy<?, ?> windowingStrategy) { - this.doFn = checkNotNull(doFn, "doFn"); + super(1 + sideOutputTags.size()); this.options = new SerializedPipelineOptions(checkNotNull(options, "options")); this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag"); this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags"); this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy"); - int numOutputs = 1 + sideOutputTags.size(); - this.receivers = new OutputReceiver[numOutputs]; - for (int i = 0; i < numOutputs; ++i) { - receivers[i] = new OutputReceiver(); - } } /** - * Adds an input to this ParDoOperation, coming from the given output of the given source. + * Returns a {@link DoFn} for processing inputs. */ - public void attachInput(ParDoOperation source, int outputNum) { - OutputReceiver fanOut = source.receivers[outputNum]; - fanOut.addOutput(this); - } + abstract DoFn<Object, Object> getDoFn(); - /** - * Starts this Operation's execution. - * - * <p>Called after all successors consuming operations have been started. - */ - public void start() { + @Override + public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { fnRunner = DoFnRunners.simpleRunner( options.getPipelineOptions(), - doFn, + getDoFn(), NullSideInputReader.empty(), - new ParDoOutputManager(), + createOutputManager(), mainOutputTag, sideOutputTags, null, windowingStrategy); fnRunner.startBundle(); - for (OutputReceiver receiver : receivers) { - if (receiver == null) { - continue; - } - for (ParDoOperation parDo : receiver.getReceiverParDos()) { - parDo.start(); - } - } + super.start(taskContext); } /** * Processes the element. */ + @Override public void process(Object elem) { LOG.info("elem: {}.", elem); fnRunner.processElement((WindowedValue<Object>) elem); } - /** - * Finishes this Operation's execution. - * - * <p>Called after all predecessors producing operations have been finished. - */ + @Override public void finish() { - for (OutputReceiver receiver : receivers) { - if (receiver == null) { - continue; - } - for (ParDoOperation parDo : receiver.getReceiverParDos()) { - parDo.finish(); - } - } + super.finish(); fnRunner.finishBundle(); } + protected DoFnRunners.OutputManager createOutputManager() { + return new ParDoOutputManager(); + } + private class ParDoOutputManager implements DoFnRunners.OutputManager { @Nullable private OutputReceiver getReceiverOrNull(TupleTag<?> tag) { + List<OutputReceiver> receivers = getOutputReceivers(); if (tag.equals(mainOutputTag)) { - return receivers[0]; + return receivers.get(0); } else if (sideOutputTags.contains(tag)) { - return receivers[sideOutputTags.indexOf(tag) + 1]; + return receivers.get(sideOutputTags.indexOf(tag) + 1); } else { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java new file mode 100644 index 0000000..97201d0 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java @@ -0,0 +1,52 @@ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Throwables; +import java.io.ByteArrayOutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; + +/** + * Created by peihe on 26/07/2017. + */ +public class WriteOperation extends Operation { + + private final Coder<Object> keyCoder; + private final Coder<Object> nullableValueCoder; + + private transient TaskInputOutputContext<Object, Object, Object, Object> taskContext; + + public WriteOperation(Coder<?> coder) { + super(0); + KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) checkNotNull(coder, "coder"); + this.keyCoder = kvCoder.getKeyCoder(); + this.nullableValueCoder = NullableCoder.of(kvCoder.getValueCoder()); + } + + @Override + public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) { + this.taskContext = checkNotNull(taskContext, "taskContext"); + } + + @Override + public void process(Object elem) { + WindowedValue<KV<?, ?>> windowedElem = (WindowedValue<KV<?, ?>>) elem; + try { + ByteArrayOutputStream keyStream = new ByteArrayOutputStream(); + keyCoder.encode(windowedElem.getValue().getKey(), keyStream); + + ByteArrayOutputStream valueStream = new ByteArrayOutputStream(); + nullableValueCoder.encode(windowedElem.getValue().getValue(), valueStream); + taskContext.write(new BytesWritable(keyStream.toByteArray()), valueStream.toByteArray()); + } catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java index f5eee28..5fa499a 100644 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java @@ -4,19 +4,12 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; import org.apache.log4j.BasicConfigurator; import org.junit.Test; import org.junit.runner.RunWith;