mr-runner: add ParDoOperation and support ParDos chaining.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/389b02b5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/389b02b5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/389b02b5 Branch: refs/heads/mr-runner Commit: 389b02b576e1d9ea5123905048de3004e462a89a Parents: 0cbdc5b Author: Pei He <p...@apache.org> Authored: Tue Jul 25 21:44:34 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:47 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/mapreduce/MapReduceRunner.java | 6 +- .../mapreduce/translation/BeamMapper.java | 57 ++----- .../runners/mapreduce/translation/Graph.java | 71 +++++---- .../mapreduce/translation/GraphConverter.java | 29 ++-- .../mapreduce/translation/GraphPlanner.java | 25 ++-- .../mapreduce/translation/JobPrototype.java | 71 +++++---- .../mapreduce/translation/OutputReceiver.java | 53 +++++++ .../mapreduce/translation/ParDoOperation.java | 149 +++++++++++++++++++ .../translation/SerializedPipelineOptions.java | 76 ++++++++++ .../mapreduce/translation/package-info.java | 22 +++ .../beam/runners/mapreduce/WordCountTest.java | 4 +- 11 files changed, 438 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index 0e3142c..11ac9a7 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -3,7 +3,6 @@ package org.apache.beam.runners.mapreduce; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Throwables; -import java.io.IOException; import org.apache.beam.runners.mapreduce.translation.Graph; import org.apache.beam.runners.mapreduce.translation.GraphConverter; import org.apache.beam.runners.mapreduce.translation.GraphPlanner; @@ -48,10 +47,7 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult> { GraphPlanner planner = new GraphPlanner(); Graph fusedGraph = planner.plan(graph); for (Graph.Vertex vertex : fusedGraph.getAllVertices()) { - if (vertex.getTransform() instanceof GroupByKey - || vertex.getTransform() instanceof Read.Bounded) { - continue; - } else { + if (vertex.getStep().getTransform() instanceof GroupByKey) { JobPrototype jobPrototype = JobPrototype.create(1, vertex); try { Job job = jobPrototype.build(options.getJarClass(), new Configuration()); http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java index 9d2f80d..b74797d 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java @@ -2,17 +2,8 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.NullSideInputReader; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; @@ -24,29 +15,19 @@ import org.slf4j.LoggerFactory; public class BeamMapper<ValueInT, ValueOutT> extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>> { - public static final String BEAM_SERIALIZED_DO_FN = "beam-serialized-do-fn"; - private static final Logger LOG = LoggerFactory.getLogger(BeamMapper.class); + public static final String BEAM_SERIALIZED_PAR_DO_OPERATION = "beam-serialized-par-do-op"; - private DoFnRunner<ValueInT, ValueOutT> doFnRunner; - private PipelineOptions options; + private ParDoOperation parDoOperation; @Override protected void setup( Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { - String serializedDoFn = checkNotNull( - context.getConfiguration().get(BEAM_SERIALIZED_DO_FN), - BEAM_SERIALIZED_DO_FN); - doFnRunner = DoFnRunners.simpleRunner( - options, - (DoFn<ValueInT, ValueOutT>) SerializableUtils - .deserializeFromByteArray( - Base64.decodeBase64(serializedDoFn), "DoFn"), - NullSideInputReader.empty(), - new MROutputManager(context), - null, - ImmutableList.<TupleTag<?>>of(), - null, - WindowingStrategy.globalDefault()); + String serializedParDo = checkNotNull( + context.getConfiguration().get(BEAM_SERIALIZED_PAR_DO_OPERATION), + BEAM_SERIALIZED_PAR_DO_OPERATION); + parDoOperation = (ParDoOperation) SerializableUtils.deserializeFromByteArray( + Base64.decodeBase64(serializedParDo), "DoFn"); + parDoOperation.start(); } @Override @@ -54,30 +35,12 @@ public class BeamMapper<ValueInT, ValueOutT> Object key, WindowedValue<ValueInT> value, Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { - LOG.info("key: {}, value: {}.", key, value); - doFnRunner.processElement(value); + parDoOperation.process(value); } @Override protected void cleanup( Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context context) { - } - - class MROutputManager implements DoFnRunners.OutputManager { - - private final Mapper<Object, Object, Object, Object>.Context context; - - MROutputManager(Mapper<?, ?, ?, ?>.Context context) { - this.context = (Mapper<Object, Object, Object, Object>.Context) context; - } - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - try { - context.write("global", output); - } catch (Exception e) { - Throwables.throwIfUnchecked(e); - } - } + parDoOperation.finish(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java index da31f89..6ea774b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -18,6 +18,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; @@ -26,7 +27,7 @@ import org.apache.commons.lang.builder.ToStringStyle; */ public class Graph { - private final Map<PTransform, Vertex> vertices; + private final Map<Step, Vertex> vertices; private final Map<HeadTail, Edge> edges; private final Set<Vertex> leafVertices; @@ -36,10 +37,10 @@ public class Graph { this.leafVertices = Sets.newHashSet(); } - public Vertex addVertex(PTransform<?, ?> transform) { - checkState(!vertices.containsKey(transform)); - Vertex v = new Vertex(transform); - vertices.put(transform, v); + public Vertex addVertex(Step step) { + checkState(!vertices.containsKey(step)); + Vertex v = new Vertex(step); + vertices.put(step, v); leafVertices.add(v); return v; } @@ -55,8 +56,8 @@ public class Graph { return e; } - public Vertex getVertex(PTransform<?, ?> transform) { - return vertices.get(transform); + public Vertex getVertex(Step step) { + return vertices.get(step); } public Edge getEdge(Vertex head, Vertex tail) { @@ -84,18 +85,18 @@ public class Graph { //TODO: add equals, hashCode, toString for following classses. public static class Vertex { - private final PTransform<?, ?> transform; + private final Step step; private final Set<Edge> incoming; private final Set<Edge> outgoing; - public Vertex(PTransform transform) { - this.transform = checkNotNull(transform, "transform"); + public Vertex(Step step) { + this.step = checkNotNull(step, "step"); this.incoming = Sets.newHashSet(); this.outgoing = Sets.newHashSet(); } - public PTransform<?, ?> getTransform() { - return transform; + public Step getStep() { + return step; } public Set<Edge> getIncoming() { @@ -107,11 +108,12 @@ public class Graph { } public boolean isSource() { + PTransform<?, ?> transform = step.getTransform(); return transform instanceof Read.Bounded || transform instanceof Read.Unbounded; } public boolean isGroupByKey() { - return transform instanceof GroupByKey; + return step.getTransform() instanceof GroupByKey; } public void addIncoming(Edge edge) { @@ -123,6 +125,7 @@ public class Graph { } public void accept(GraphVisitor visitor) { + PTransform<?, ?> transform = step.getTransform(); if (transform instanceof ParDo.SingleOutput || transform instanceof ParDo.MultiOutput) { visitor.visitParDo(this); } else if (transform instanceof GroupByKey) { @@ -144,14 +147,14 @@ public class Graph { } if (obj instanceof Vertex) { Vertex other = (Vertex) obj; - return transform.equals(other.transform); + return step.equals(other.step); } return false; } @Override public int hashCode() { - return Objects.hash(this.getClass(), transform); + return Objects.hash(this.getClass(), step); } @Override @@ -218,7 +221,7 @@ public class Graph { } public static class NodePath { - private final LinkedList<PTransform<?, ?>> path; + private final LinkedList<Step> path; public NodePath() { this.path = new LinkedList<>(); @@ -228,16 +231,16 @@ public class Graph { this.path = new LinkedList<>(nodePath.path); } - public void addFirst(PTransform<?, ?> transform) { - path.addFirst(transform); + public void addFirst(Step step) { + path.addFirst(step); } - public void addLast(PTransform<?, ?> transform) { - path.addLast(transform); + public void addLast(Step step) { + path.addLast(step); } - public Iterable<PTransform<?, ?>> transforms() { - return path; + public Iterable<Step> steps() { + return ImmutableList.copyOf(path); } @Override @@ -260,15 +263,33 @@ public class Graph { @Override public String toString() { StringBuilder sb = new StringBuilder(); - for (PTransform<?, ?> collect : path) { - sb.append(collect.getName() + "|"); + for (Step step : path) { + sb.append(step.getFullName() + "|"); } - // sb.deleteCharAt(sb.length() - 1); + sb.deleteCharAt(sb.length() - 1); return sb.toString(); } } @AutoValue + public abstract static class Step { + abstract String getFullName(); + // TODO: remove public + public abstract PTransform<?, ?> getTransform(); + abstract List<TupleTag<?>> getInputs(); + abstract List<TupleTag<?>> getOutputs(); + + public static Step of( + String fullName, + PTransform<?, ?> transform, + List<TupleTag<?>> inputs, + List<TupleTag<?>> outputs) { + return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graph_Step( + fullName, transform, inputs, outputs); + } + } + + @AutoValue public abstract static class HeadTail { abstract Vertex getHead(); abstract Vertex getTail(); http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java index 359a6e2..bd56ac5 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java @@ -1,40 +1,49 @@ package org.apache.beam.runners.mapreduce.translation; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import java.util.Map; -import java.util.Set; import org.apache.beam.runners.mapreduce.MapReduceRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; /** * Pipeline translator for {@link MapReduceRunner}. */ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { - private final Map<PValue, Graph.Vertex> outputToProducer; + private final Map<PValue, TupleTag<?>> pValueToTupleTag; + private final Map<TupleTag<?>, Graph.Vertex> outputToProducer; private final Graph graph; public GraphConverter() { + this.pValueToTupleTag = Maps.newHashMap(); this.outputToProducer = Maps.newHashMap(); this.graph = new Graph(); } @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { - Graph.Vertex v = graph.addVertex(node.getTransform()); - - for (PValue input : node.getInputs().values()) { - if (outputToProducer.containsKey(input)) { - Graph.Vertex producer = outputToProducer.get(input); + Graph.Step step = Graph.Step.of( + node.getFullName(), + node.getTransform(), + ImmutableList.copyOf(node.getInputs().keySet()), + ImmutableList.copyOf(node.getOutputs().keySet())); + Graph.Vertex v = graph.addVertex(step); + + for (PValue pValue : node.getInputs().values()) { + TupleTag<?> tag = pValueToTupleTag.get(pValue); + if (outputToProducer.containsKey(tag)) { + Graph.Vertex producer = outputToProducer.get(tag); graph.addEdge(producer, v); } } - for (PValue output : node.getOutputs().values()) { - outputToProducer.put(output, v); + for (Map.Entry<TupleTag<?>, PValue> entry : node.getOutputs().entrySet()) { + pValueToTupleTag.put(entry.getValue(), entry.getKey()); + outputToProducer.put(entry.getKey(), v); } } http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index 793efd7..9198b28 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -31,23 +31,24 @@ public class GraphPlanner { // drop if read is leaf vertex. return; } - Graph.Vertex v = fusedGraph.addVertex(read.getTransform()); - workingPath.addFirst(read.getTransform()); + Graph.Vertex v = fusedGraph.addVertex(read.getStep()); + workingPath.addFirst(read.getStep()); Graph.Edge edge = fusedGraph.addEdge(v, workingVertex); edge.addPath(workingPath); } @Override public void visitParDo(Graph.Vertex parDo) { + Graph.Step step = parDo.getStep(); checkArgument( - parDo.getTransform().getAdditionalInputs().isEmpty(), - "Side inputs are not supported."); + step.getTransform().getAdditionalInputs().isEmpty(), + "Side inputs are not " + "supported."); if (workingVertex == null) { // Leaf vertex - workingVertex = fusedGraph.addVertex(parDo.getTransform()); + workingVertex = fusedGraph.addVertex(step); workingPath = new Graph.NodePath(); } else { - workingPath.addFirst(parDo.getTransform()); + workingPath.addFirst(step); } checkArgument( parDo.getIncoming().size() == 1, @@ -74,10 +75,11 @@ public class GraphPlanner { if (workingVertex == null) { return; } - Graph.Vertex v = fusedGraph.addVertex(groupByKey.getTransform()); - workingPath.addFirst(groupByKey.getTransform()); - Graph.Edge edge = fusedGraph.addEdge(v, workingVertex); + Graph.Step step = groupByKey.getStep(); + Graph.Vertex addedGroupByKey = fusedGraph.addVertex(step); + Graph.Edge edge = fusedGraph.addEdge(addedGroupByKey, workingVertex); edge.addPath(workingPath); + workingVertex = addedGroupByKey; processParent(groupByKey.getIncoming().iterator().next().getHead()); } @@ -86,13 +88,14 @@ public class GraphPlanner { } private void processParent(Graph.Vertex parent) { - Graph.Vertex v = fusedGraph.getVertex(parent.getTransform()); + Graph.Step step = parent.getStep(); + Graph.Vertex v = fusedGraph.getVertex(step); if (v == null) { parent.accept(this); } else { // TODO: parent is consumed more than once. // It is duplicated in multiple outgoing path. Figure out the impact. - workingPath.addFirst(parent.getTransform()); + workingPath.addFirst(step); fusedGraph.getEdge(v, workingVertex).addPath(workingPath); } } http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index bdbbe5d..7cdf697 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -4,21 +4,23 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; -import javax.annotation.Nullable; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; @@ -51,8 +53,9 @@ public class JobPrototype { // Setup BoundedSources in BeamInputFormat. // TODO: support more than one inputs Graph.Vertex head = Iterables.getOnlyElement(vertex.getIncoming()).getHead(); - checkState(head.getTransform() instanceof Read.Bounded); - Read.Bounded read = (Read.Bounded) head.getTransform(); + Graph.Step headStep = head.getStep(); + checkState(headStep.getTransform() instanceof Read.Bounded); + Read.Bounded read = (Read.Bounded) headStep.getTransform(); conf.set( BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE, Base64.encodeBase64String(SerializableUtils.serializeToByteArray(read.getSource()))); @@ -62,34 +65,52 @@ public class JobPrototype { // TODO: support more than one out going edge. Graph.Edge outEdge = Iterables.getOnlyElement(head.getOutgoing()); Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths()); - List<DoFn> doFns = new ArrayList<>(); - doFns.addAll(FluentIterable.from(outPath.transforms()) - .filter(new Predicate<PTransform<?, ?>>() { + List<Graph.Step> parDos = new ArrayList<>(); + parDos.addAll(FluentIterable.from(outPath.steps()) + .filter(new Predicate<Graph.Step>() { @Override - public boolean apply(PTransform<?, ?> input) { - return !(input instanceof Read.Bounded); - } - }) - .transform(new Function<PTransform<?, ?>, DoFn>() { - @Override - public DoFn apply(PTransform<?, ?> input) { - checkArgument( - input instanceof ParDo.SingleOutput, "Only support ParDo.SingleOutput."); - ParDo.SingleOutput parDo = (ParDo.SingleOutput) input; - return parDo.getFn(); + public boolean apply(Graph.Step input) { + PTransform<?, ?> transform = input.getTransform(); + return transform instanceof ParDo.SingleOutput + || transform instanceof ParDo.MultiOutput; }}) .toList()); - if (vertex.getTransform() instanceof ParDo.SingleOutput) { - doFns.add(((ParDo.SingleOutput) vertex.getTransform()).getFn()); - } else if (vertex.getTransform() instanceof ParDo.MultiOutput) { - doFns.add(((ParDo.MultiOutput) vertex.getTransform()).getFn()); + Graph.Step vertexStep = vertex.getStep(); + if (vertexStep.getTransform() instanceof ParDo.SingleOutput + || vertexStep.getTransform() instanceof ParDo.MultiOutput) { + parDos.add(vertexStep); + } + + ParDoOperation root = null; + ParDoOperation prev = null; + for (Graph.Step step : parDos) { + ParDoOperation current = new ParDoOperation( + getDoFn(step.getTransform()), + PipelineOptionsFactory.create(), + (TupleTag<Object>) step.getOutputs().iterator().next(), + ImmutableList.<TupleTag<?>>of(), + WindowingStrategy.globalDefault()); + if (root == null) { + root = current; + } else { + // TODO: set a proper outputNum for ParDo.MultiOutput instead of zero. + current.attachInput(prev, 0); + } + prev = current; } conf.set( - BeamMapper.BEAM_SERIALIZED_DO_FN, - Base64.encodeBase64String(SerializableUtils.serializeToByteArray( - Iterables.getOnlyElement(doFns)))); + BeamMapper.BEAM_SERIALIZED_PAR_DO_OPERATION, + Base64.encodeBase64String(SerializableUtils.serializeToByteArray(root))); job.setMapperClass(BeamMapper.class); job.setOutputFormatClass(NullOutputFormat.class); return job; } + + private DoFn<Object, Object> getDoFn(PTransform<?, ?> transform) { + if (transform instanceof ParDo.SingleOutput) { + return ((ParDo.SingleOutput) transform).getFn(); + } else { + return ((ParDo.MultiOutput) transform).getFn(); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java new file mode 100644 index 0000000..59cee3c --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * OutputReceiver that forwards each input it receives to each of a list of down stream + * ParDoOperations. + */ +public class OutputReceiver implements Serializable { + private final List<ParDoOperation> receiverParDos = new ArrayList<>(); + + /** + * Adds a new receiver that this OutputReceiver forwards to. + */ + public void addOutput(ParDoOperation receiver) { + receiverParDos.add(receiver); + } + + public List<ParDoOperation> getReceiverParDos() { + return ImmutableList.copyOf(receiverParDos); + } + + /** + * Processes the element. + */ + public void process(Object elem) { + for (ParDoOperation out : receiverParDos) { + if (out != null) { + out.process(elem); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java new file mode 100644 index 0000000..97473bb --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Operation for ParDo. + */ +public class ParDoOperation implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(ParDoOperation.class); + + private final DoFn<Object, Object> doFn; + private final SerializedPipelineOptions options; + private final TupleTag<Object> mainOutputTag; + private final List<TupleTag<?>> sideOutputTags; + private final WindowingStrategy<?, ?> windowingStrategy; + private final OutputReceiver[] receivers; + + private DoFnRunner<Object, Object> fnRunner; + + public ParDoOperation( + DoFn<Object, Object> doFn, + PipelineOptions options, + TupleTag<Object> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + WindowingStrategy<?, ?> windowingStrategy) { + this.doFn = checkNotNull(doFn, "doFn"); + this.options = new SerializedPipelineOptions(checkNotNull(options, "options")); + this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag"); + this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags"); + this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy"); + int numOutputs = 1 + sideOutputTags.size(); + this.receivers = new OutputReceiver[numOutputs]; + for (int i = 0; i < numOutputs; ++i) { + receivers[i] = new OutputReceiver(); + } + } + + /** + * Adds an input to this ParDoOperation, coming from the given output of the given source. + */ + public void attachInput(ParDoOperation source, int outputNum) { + OutputReceiver fanOut = source.receivers[outputNum]; + fanOut.addOutput(this); + } + + /** + * Starts this Operation's execution. + * + * <p>Called after all successors consuming operations have been started. + */ + public void start() { + fnRunner = DoFnRunners.simpleRunner( + options.getPipelineOptions(), + doFn, + NullSideInputReader.empty(), + new ParDoOutputManager(), + mainOutputTag, + sideOutputTags, + null, + windowingStrategy); + fnRunner.startBundle(); + for (OutputReceiver receiver : receivers) { + if (receiver == null) { + continue; + } + for (ParDoOperation parDo : receiver.getReceiverParDos()) { + parDo.start(); + } + } + } + + /** + * Processes the element. + */ + public void process(Object elem) { + LOG.info("elem: {}.", elem); + fnRunner.processElement((WindowedValue<Object>) elem); + } + + /** + * Finishes this Operation's execution. + * + * <p>Called after all predecessors producing operations have been finished. + */ + public void finish() { + for (OutputReceiver receiver : receivers) { + if (receiver == null) { + continue; + } + for (ParDoOperation parDo : receiver.getReceiverParDos()) { + parDo.finish(); + } + } + fnRunner.finishBundle(); + } + + private class ParDoOutputManager implements DoFnRunners.OutputManager { + + @Nullable + private OutputReceiver getReceiverOrNull(TupleTag<?> tag) { + if (tag.equals(mainOutputTag)) { + return receivers[0]; + } else if (sideOutputTags.contains(tag)) { + return receivers[sideOutputTags.indexOf(tag) + 1]; + } else { + return null; + } + } + + @Override + public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) { + OutputReceiver receiver = getReceiverOrNull(tupleTag); + if (receiver != null) { + receiver.process(windowedValue); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java new file mode 100644 index 0000000..5c37b7c --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SerializedPipelineOptions.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.common.ReflectHelpers; + +/** + * Encapsulates the PipelineOptions in serialized form to ship them to the cluster. + */ +public class SerializedPipelineOptions implements Serializable { + + private final byte[] serializedOptions; + + /** Lazily initialized copy of deserialized options. */ + private transient PipelineOptions pipelineOptions; + + public SerializedPipelineOptions(PipelineOptions options) { + checkNotNull(options, "PipelineOptions must not be null."); + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + createMapper().writeValue(baos, options); + this.serializedOptions = baos.toByteArray(); + } catch (Exception e) { + throw new RuntimeException("Couldn't serialize PipelineOptions.", e); + } + + } + + public PipelineOptions getPipelineOptions() { + if (pipelineOptions == null) { + try { + pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class); + + FileSystems.setDefaultPipelineOptions(pipelineOptions); + } catch (IOException e) { + throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e); + } + } + + return pipelineOptions; + } + + /** + * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing + * for user specified configuration injection into the ObjectMapper. This supports user custom + * types on {@link PipelineOptions}. + */ + private static ObjectMapper createMapper() { + return new ObjectMapper().registerModules( + ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java new file mode 100644 index 0000000..c9360ac --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation for translating Beam pipelines to MapReduce jobs. + */ +package org.apache.beam.runners.mapreduce.translation; http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java index 80df3e1..f5eee28 100644 --- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java +++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java @@ -78,8 +78,8 @@ public class WordCountTest { // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the // static FormatAsTextFn() to the ParDo transform. p.apply("ReadLines", TextIO.read().from(input)) - .apply(ParDo.of(new ExtractWordsFn())); -// .apply(Count.<String>perElement()) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); // .apply(MapElements.via(new FormatAsTextFn())) // .apply("WriteCounts", TextIO.write().to(output));