mr-runner: support graph visualization with dotfiles.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98da2a2a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98da2a2a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98da2a2a Branch: refs/heads/mr-runner Commit: 98da2a2ac88c544dc3623b4f6bbe1cbbfaf569b2 Parents: 16e6320 Author: Pei He <p...@apache.org> Authored: Wed Aug 2 19:19:14 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:48 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/MapReducePipelineOptions.java | 1 - .../beam/runners/mapreduce/MapReduceRunner.java | 12 ++- .../mapreduce/translation/DotfileWriter.java | 54 ++++++++++++ .../mapreduce/translation/GraphConverter.java | 88 +++++++++++++++++++- .../runners/mapreduce/translation/Graphs.java | 19 +++-- .../mapreduce/translation/JobPrototype.java | 6 +- .../translation/TranslationContext.java | 6 +- 7 files changed, 171 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java index 73c7d47..c37da58 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java @@ -38,7 +38,6 @@ public interface MapReducePipelineOptions extends PipelineOptions { PipelineOptionsFactory.Builder.class.getName(), "org.apache.beam.sdk.options.ProxyInvocationHandler"); - @Description("The jar class of the user Beam program.") @Default.InstanceFactory(JarClassInstanceFactory.class) Class<?> getJarClass(); http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index b6a82d1..c5626a4 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -20,9 +20,10 @@ package org.apache.beam.runners.mapreduce; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Throwables; -import org.apache.beam.runners.mapreduce.translation.Graphs; +import org.apache.beam.runners.mapreduce.translation.DotfileWriter; import org.apache.beam.runners.mapreduce.translation.GraphConverter; import org.apache.beam.runners.mapreduce.translation.GraphPlanner; +import org.apache.beam.runners.mapreduce.translation.Graphs; import org.apache.beam.runners.mapreduce.translation.JobPrototype; import org.apache.beam.runners.mapreduce.translation.TranslationContext; import org.apache.beam.sdk.Pipeline; @@ -31,12 +32,16 @@ import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link PipelineRunner} for MapReduce. */ public class MapReduceRunner extends PipelineRunner<PipelineResult> { + private static final Logger LOG = LoggerFactory.getLogger(MapReduceRunner.class); + /** * Construct a runner from the provided options. * @@ -59,8 +64,13 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult> { GraphConverter graphConverter = new GraphConverter(context); pipeline.traverseTopologically(graphConverter); + LOG.info(graphConverter.getDotfile()); + GraphPlanner planner = new GraphPlanner(); Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph()); + + LOG.info(DotfileWriter.toDotfile(fusedGraph)); + int stageId = 0; for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, options); http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java new file mode 100644 index 0000000..5b0fcd8 --- /dev/null +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.mapreduce.translation; + +/** + * Class that outputs {@link Graph} to dot file. + */ +public class DotfileWriter { + + public static <StepT extends Graph.AbstractStep<TagT>, TagT extends Graph.AbstractTag> + String toDotfile(Graphs.FusedGraph fusedGraph) { + StringBuilder sb = new StringBuilder(); + sb.append("\ndigraph G {\n"); + + int i = 0; + for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { + sb.append(String.format(" subgraph \"cluster_%d\" {\n", i++)); + for (Graphs.Step step : fusedStep.getSteps()) { + sb.append(String.format(" \"%s\" [shape=box];\n", step.getFullName())); + for (Graph.AbstractTag outTag : step.getOutputTags()) { + sb.append(String.format(" \"%s\" [shape=ellipse];\n", outTag)); + } + } + sb.append(String.format(" }")); + } + for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) { + for (Graphs.Step step : fusedStep.getSteps()) { + for (Graph.AbstractTag inTag : step.getInputTags()) { + sb.append(String.format(" \"%s\" -> \"%s\";\n", inTag, step)); + } + for (Graph.AbstractTag outTag : step.getOutputTags()) { + sb.append(String.format(" \"%s\" -> \"%s\";\n", step, outTag)); + } + } + } + sb.append("}\n"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java index 1e818fa..de1c80b 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java @@ -18,14 +18,15 @@ package org.apache.beam.runners.mapreduce.translation; import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.collect.Maps; import java.util.Map; +import java.util.Stack; import org.apache.beam.runners.mapreduce.MapReduceRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; /** * Pipeline translator for {@link MapReduceRunner}. @@ -33,11 +34,18 @@ import org.apache.beam.sdk.values.TupleTag; public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { private final TranslationContext context; - private final Map<PValue, TupleTag<?>> pValueToTupleTag; + private final Stack<StringBuilder> dotfileNodesBuilders; + private final Map<TransformHierarchy.Node, Integer> enclosedTransformCounts; + private final StringBuilder dotfileEdgesBuilder; + + private int indent; public GraphConverter(TranslationContext context) { this.context = checkNotNull(context, "context"); - this.pValueToTupleTag = Maps.newHashMap(); + this.enclosedTransformCounts = Maps.newHashMap(); + this.dotfileNodesBuilders = new Stack<>(); + this.dotfileEdgesBuilder = new StringBuilder(); + this.indent = 0; } @Override @@ -45,20 +53,47 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { // check if current composite transforms need to be translated. // If not, all sub transforms will be translated in visitPrimitiveTransform. PTransform<?, ?> transform = node.getTransform(); + dotfileNodesBuilders.push(new StringBuilder()); if (transform != null) { + markEnclosedTransformCounts(node); TransformTranslator translator = TranslatorRegistry.getTranslator(transform); if (translator != null && applyCanTranslate(transform, node, translator)) { applyTransform(transform, node, translator); return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; } + indent += 2; } return CompositeBehavior.ENTER_TRANSFORM; } @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) { + if (node.getTransform() != null) { + if (enclosedTransformCounts.get(node) > 1) { + dotfileNodesBuilders.peek().insert(0, new StringBuilder() + .append(getIndent()).append( + String.format("subgraph \"cluster_%s\" {", node.getFullName())) + .append('\n') + .append(getIndent()).append( + String.format(" label=\"%s\";", node.getFullName())) + .append('\n') + .toString()); + dotfileNodesBuilders.peek().append(new StringBuilder() + .append(getIndent()).append("}").append('\n') + .toString()); + } + StringBuilder top = dotfileNodesBuilders.pop(); + dotfileNodesBuilders.peek().append(top.toString()); + indent -= 2; + } + } + + @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { if (!node.isRootNode()) { + markEnclosedTransformCounts(node); + PTransform<?, ?> transform = node.getTransform(); TransformTranslator translator = TranslatorRegistry.getTranslator(transform); if (translator == null || !applyCanTranslate(transform, node, translator)) { @@ -69,10 +104,48 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { } } + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) { + dotfileNodesBuilders.peek().append(getIndent()) + .append(String.format("\"%s\" [shape=ellipse];", value.getName())) + .append('\n'); + } + + private void markEnclosedTransformCounts(TransformHierarchy.Node node) { + TransformHierarchy.Node parent = node.getEnclosingNode(); + Integer primitiveCount = enclosedTransformCounts.get(parent); + if (primitiveCount == null) { + primitiveCount = 0; + } + enclosedTransformCounts.put(parent, primitiveCount + 1); + } + + public String getDotfile() { + return String.format( + "\ndigraph G {\n%s%s}\n", + dotfileNodesBuilders.peek().toString(), + dotfileEdgesBuilder.toString()); + } + private <T extends PTransform<?, ?>> void applyTransform( PTransform<?, ?> transform, TransformHierarchy.Node node, TransformTranslator<?> translator) { + dotfileNodesBuilders.peek() + .append(getIndent()) + .append(String.format("\"%s\" [shape=box];", node.getFullName())) + .append('\n'); + for (PValue input : node.getInputs().values()) { + dotfileEdgesBuilder + .append(String.format(" \"%s\" -> \"%s\";", input.getName(), node.getFullName())) + .append('\n'); + } + for (PValue output : node.getOutputs().values()) { + dotfileEdgesBuilder + .append(String.format(" \"%s\" -> \"%s\";", node.getFullName(), output.getName())) + .append('\n'); + } + @SuppressWarnings("unchecked") T typedTransform = (T) transform; @SuppressWarnings("unchecked") @@ -92,4 +165,13 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults { context.getUserGraphContext().setCurrentNode(node); return typedTranslator.canTranslate(typedTransform, context); } + + private String getIndent() { + StringBuilder ret = new StringBuilder(); + for (int i = 0; i < indent; ++i) { + ret.append(' '); + } + return ret.toString(); + } } + http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java index 029d425..cef5afc 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java @@ -153,10 +153,6 @@ public class Graphs { } return sb.toString(); } - - public String getFullName() { - return toString(); - } } @AutoValue @@ -173,16 +169,27 @@ public class Graphs { return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Step( inputTags, outputTags, fullName, operation); } + + @Override + public String toString() { + return getFullName(); + } } @AutoValue public abstract static class Tag extends Graph.AbstractTag { + abstract String getName(); abstract TupleTag<?> getTupleTag(); abstract Coder<?> getCoder(); - public static Tag of(TupleTag<?> tupleTag, Coder<?> coder) { + @Override + public String toString() { + return getName(); + } + + public static Tag of(String name, TupleTag<?> tupleTag, Coder<?> coder) { return new org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Tag( - tupleTag, coder); + name, tupleTag, coder); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index c336a70..24feebd 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -89,10 +89,12 @@ public class JobPrototype { WindowingStrategy<?, ?> windowingStrategy = operation.getWindowingStrategy(); KvCoder<?, ?> kvCoder = operation.getKvCoder(); + String reifyStepName = groupByKey.getFullName() + "-Reify"; Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), windowingStrategy); - Graphs.Tag reifyOutputTag = Graphs.Tag.of(new TupleTag<Object>(), reifyValueCoder); + Graphs.Tag reifyOutputTag = Graphs.Tag.of( + reifyStepName + ".out", new TupleTag<Object>(), reifyValueCoder); Graphs.Step reifyStep = Graphs.Step.of( - groupByKey.getFullName() + "-Reify", + reifyStepName, new ReifyTimestampAndWindowsParDoOperation(options, operation.getWindowingStrategy()), groupByKey.getInputTags(), ImmutableList.of(reifyOutputTag)); http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java index 0df365e..2b51df5 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java @@ -105,8 +105,9 @@ public class TranslationContext { checkState( pValueToTupleTag.containsKey(pValue), String.format("Failed to find TupleTag for pValue: %s.", pValue)); + PCollection<?> pc = (PCollection<?>) pValue; return Graphs.Tag.of( - pValueToTupleTag.get(pValue), ((PCollection<?>) pValue).getCoder()); + pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder()); }}) .toList(); } @@ -116,7 +117,8 @@ public class TranslationContext { .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>() { @Override public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) { - return Graphs.Tag.of(entry.getKey(), ((PCollection<?>) entry.getValue()).getCoder()); + PCollection<?> pc = (PCollection<?>) entry.getValue(); + return Graphs.Tag.of(pc.getName(), entry.getKey(), pc.getCoder()); }}) .toList(); }