mr-runner: support graph visualization with dotfiles.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98da2a2a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98da2a2a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98da2a2a

Branch: refs/heads/mr-runner
Commit: 98da2a2ac88c544dc3623b4f6bbe1cbbfaf569b2
Parents: 16e6320
Author: Pei He <p...@apache.org>
Authored: Wed Aug 2 19:19:14 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:48 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/MapReducePipelineOptions.java     |  1 -
 .../beam/runners/mapreduce/MapReduceRunner.java | 12 ++-
 .../mapreduce/translation/DotfileWriter.java    | 54 ++++++++++++
 .../mapreduce/translation/GraphConverter.java   | 88 +++++++++++++++++++-
 .../runners/mapreduce/translation/Graphs.java   | 19 +++--
 .../mapreduce/translation/JobPrototype.java     |  6 +-
 .../translation/TranslationContext.java         |  6 +-
 7 files changed, 171 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index 73c7d47..c37da58 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -38,7 +38,6 @@ public interface MapReducePipelineOptions extends 
PipelineOptions {
       PipelineOptionsFactory.Builder.class.getName(),
       "org.apache.beam.sdk.options.ProxyInvocationHandler");
 
-
   @Description("The jar class of the user Beam program.")
   @Default.InstanceFactory(JarClassInstanceFactory.class)
   Class<?> getJarClass();

http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index b6a82d1..c5626a4 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -20,9 +20,10 @@ package org.apache.beam.runners.mapreduce;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Throwables;
-import org.apache.beam.runners.mapreduce.translation.Graphs;
+import org.apache.beam.runners.mapreduce.translation.DotfileWriter;
 import org.apache.beam.runners.mapreduce.translation.GraphConverter;
 import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
+import org.apache.beam.runners.mapreduce.translation.Graphs;
 import org.apache.beam.runners.mapreduce.translation.JobPrototype;
 import org.apache.beam.runners.mapreduce.translation.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
@@ -31,12 +32,16 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link PipelineRunner} for MapReduce.
  */
 public class MapReduceRunner extends PipelineRunner<PipelineResult> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MapReduceRunner.class);
+
   /**
    * Construct a runner from the provided options.
    *
@@ -59,8 +64,13 @@ public class MapReduceRunner extends 
PipelineRunner<PipelineResult> {
     GraphConverter graphConverter = new GraphConverter(context);
     pipeline.traverseTopologically(graphConverter);
 
+    LOG.info(graphConverter.getDotfile());
+
     GraphPlanner planner = new GraphPlanner();
     Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph());
+
+    LOG.info(DotfileWriter.toDotfile(fusedGraph));
+
     int stageId = 0;
     for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
       JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, 
options);

http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
new file mode 100644
index 0000000..5b0fcd8
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+/**
+ * Class that outputs {@link Graph} to dot file.
+ */
+public class DotfileWriter {
+
+  public static <StepT extends Graph.AbstractStep<TagT>, TagT extends 
Graph.AbstractTag>
+  String toDotfile(Graphs.FusedGraph fusedGraph) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("\ndigraph G {\n");
+
+    int i = 0;
+    for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      sb.append(String.format("  subgraph \"cluster_%d\" {\n", i++));
+      for (Graphs.Step step : fusedStep.getSteps()) {
+        sb.append(String.format("    \"%s\" [shape=box];\n", 
step.getFullName()));
+        for (Graph.AbstractTag outTag : step.getOutputTags()) {
+          sb.append(String.format("    \"%s\" [shape=ellipse];\n", outTag));
+        }
+      }
+      sb.append(String.format("  }"));
+    }
+    for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
+      for (Graphs.Step step : fusedStep.getSteps()) {
+        for (Graph.AbstractTag inTag : step.getInputTags()) {
+          sb.append(String.format("  \"%s\" -> \"%s\";\n", inTag, step));
+        }
+        for (Graph.AbstractTag outTag : step.getOutputTags()) {
+          sb.append(String.format("  \"%s\" -> \"%s\";\n", step, outTag));
+        }
+      }
+    }
+    sb.append("}\n");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
index 1e818fa..de1c80b 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -18,14 +18,15 @@
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.collect.Maps;
 import java.util.Map;
+import java.util.Stack;
 import org.apache.beam.runners.mapreduce.MapReduceRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Pipeline translator for {@link MapReduceRunner}.
@@ -33,11 +34,18 @@ import org.apache.beam.sdk.values.TupleTag;
 public class GraphConverter extends Pipeline.PipelineVisitor.Defaults {
 
   private final TranslationContext context;
-  private final Map<PValue, TupleTag<?>> pValueToTupleTag;
+  private final Stack<StringBuilder> dotfileNodesBuilders;
+  private final Map<TransformHierarchy.Node, Integer> enclosedTransformCounts;
+  private final StringBuilder dotfileEdgesBuilder;
+
+  private int indent;
 
   public GraphConverter(TranslationContext context) {
     this.context = checkNotNull(context, "context");
-    this.pValueToTupleTag = Maps.newHashMap();
+    this.enclosedTransformCounts = Maps.newHashMap();
+    this.dotfileNodesBuilders = new Stack<>();
+    this.dotfileEdgesBuilder = new StringBuilder();
+    this.indent = 0;
   }
 
   @Override
@@ -45,20 +53,47 @@ public class GraphConverter extends 
Pipeline.PipelineVisitor.Defaults {
     // check if current composite transforms need to be translated.
     // If not, all sub transforms will be translated in 
visitPrimitiveTransform.
     PTransform<?, ?> transform = node.getTransform();
+    dotfileNodesBuilders.push(new StringBuilder());
     if (transform != null) {
+      markEnclosedTransformCounts(node);
       TransformTranslator translator = 
TranslatorRegistry.getTranslator(transform);
 
       if (translator != null && applyCanTranslate(transform, node, 
translator)) {
         applyTransform(transform, node, translator);
         return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
       }
+      indent += 2;
     }
     return CompositeBehavior.ENTER_TRANSFORM;
   }
 
   @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    if (node.getTransform() != null) {
+      if (enclosedTransformCounts.get(node) > 1) {
+        dotfileNodesBuilders.peek().insert(0, new StringBuilder()
+            .append(getIndent()).append(
+                String.format("subgraph \"cluster_%s\" {", node.getFullName()))
+            .append('\n')
+            .append(getIndent()).append(
+                String.format("  label=\"%s\";", node.getFullName()))
+            .append('\n')
+            .toString());
+        dotfileNodesBuilders.peek().append(new StringBuilder()
+            .append(getIndent()).append("}").append('\n')
+            .toString());
+      }
+      StringBuilder top = dotfileNodesBuilders.pop();
+      dotfileNodesBuilders.peek().append(top.toString());
+      indent -= 2;
+    }
+  }
+
+  @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     if (!node.isRootNode()) {
+      markEnclosedTransformCounts(node);
+
       PTransform<?, ?> transform = node.getTransform();
       TransformTranslator translator = 
TranslatorRegistry.getTranslator(transform);
       if (translator == null || !applyCanTranslate(transform, node, 
translator)) {
@@ -69,10 +104,48 @@ public class GraphConverter extends 
Pipeline.PipelineVisitor.Defaults {
     }
   }
 
+  @Override
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
+    dotfileNodesBuilders.peek().append(getIndent())
+        .append(String.format("\"%s\" [shape=ellipse];", value.getName()))
+        .append('\n');
+  }
+
+  private void markEnclosedTransformCounts(TransformHierarchy.Node node) {
+    TransformHierarchy.Node parent = node.getEnclosingNode();
+    Integer primitiveCount = enclosedTransformCounts.get(parent);
+    if (primitiveCount == null) {
+      primitiveCount = 0;
+    }
+    enclosedTransformCounts.put(parent, primitiveCount + 1);
+  }
+
+  public String getDotfile() {
+    return String.format(
+        "\ndigraph G {\n%s%s}\n",
+        dotfileNodesBuilders.peek().toString(),
+        dotfileEdgesBuilder.toString());
+  }
+
   private <T extends PTransform<?, ?>> void applyTransform(
       PTransform<?, ?> transform,
       TransformHierarchy.Node node,
       TransformTranslator<?> translator) {
+    dotfileNodesBuilders.peek()
+        .append(getIndent())
+        .append(String.format("\"%s\" [shape=box];", node.getFullName()))
+        .append('\n');
+    for (PValue input : node.getInputs().values()) {
+      dotfileEdgesBuilder
+          .append(String.format("  \"%s\" -> \"%s\";", input.getName(), 
node.getFullName()))
+          .append('\n');
+    }
+    for (PValue output : node.getOutputs().values()) {
+      dotfileEdgesBuilder
+          .append(String.format("  \"%s\" -> \"%s\";", node.getFullName(), 
output.getName()))
+          .append('\n');
+    }
+
     @SuppressWarnings("unchecked")
     T typedTransform = (T) transform;
     @SuppressWarnings("unchecked")
@@ -92,4 +165,13 @@ public class GraphConverter extends 
Pipeline.PipelineVisitor.Defaults {
     context.getUserGraphContext().setCurrentNode(node);
     return typedTranslator.canTranslate(typedTransform, context);
   }
+
+  private String getIndent() {
+    StringBuilder ret = new StringBuilder();
+    for (int i = 0; i < indent; ++i) {
+      ret.append(' ');
+    }
+    return ret.toString();
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
index 029d425..cef5afc 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
@@ -153,10 +153,6 @@ public class Graphs {
       }
       return sb.toString();
     }
-
-    public String getFullName() {
-      return toString();
-    }
   }
 
   @AutoValue
@@ -173,16 +169,27 @@ public class Graphs {
       return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Step(
           inputTags, outputTags, fullName, operation);
     }
+
+    @Override
+    public String toString() {
+      return getFullName();
+    }
   }
 
   @AutoValue
   public abstract static class Tag extends Graph.AbstractTag {
+    abstract String getName();
     abstract TupleTag<?> getTupleTag();
     abstract Coder<?> getCoder();
 
-    public static Tag of(TupleTag<?> tupleTag, Coder<?> coder) {
+    @Override
+    public String toString() {
+      return getName();
+    }
+
+    public static Tag of(String name, TupleTag<?> tupleTag, Coder<?> coder) {
       return new 
org.apache.beam.runners.mapreduce.translation.AutoValue_Graphs_Tag(
-          tupleTag, coder);
+          name, tupleTag, coder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index c336a70..24feebd 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -89,10 +89,12 @@ public class JobPrototype {
       WindowingStrategy<?, ?> windowingStrategy = 
operation.getWindowingStrategy();
       KvCoder<?, ?> kvCoder = operation.getKvCoder();
 
+      String reifyStepName = groupByKey.getFullName() + "-Reify";
       Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), 
windowingStrategy);
-      Graphs.Tag reifyOutputTag = Graphs.Tag.of(new TupleTag<Object>(), 
reifyValueCoder);
+      Graphs.Tag reifyOutputTag = Graphs.Tag.of(
+          reifyStepName + ".out", new TupleTag<Object>(), reifyValueCoder);
       Graphs.Step reifyStep = Graphs.Step.of(
-          groupByKey.getFullName() + "-Reify",
+          reifyStepName,
           new ReifyTimestampAndWindowsParDoOperation(options, 
operation.getWindowingStrategy()),
           groupByKey.getInputTags(),
           ImmutableList.of(reifyOutputTag));

http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
index 0df365e..2b51df5 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -105,8 +105,9 @@ public class TranslationContext {
               checkState(
                   pValueToTupleTag.containsKey(pValue),
                   String.format("Failed to find TupleTag for pValue: %s.", 
pValue));
+              PCollection<?> pc = (PCollection<?>) pValue;
               return Graphs.Tag.of(
-                  pValueToTupleTag.get(pValue), ((PCollection<?>) 
pValue).getCoder());
+                  pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
             }})
           .toList();
     }
@@ -116,7 +117,8 @@ public class TranslationContext {
           .transform(new Function<Map.Entry<TupleTag<?>, PValue>, 
Graphs.Tag>() {
             @Override
             public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) {
-              return Graphs.Tag.of(entry.getKey(), ((PCollection<?>) 
entry.getValue()).getCoder());
+              PCollection<?> pc = (PCollection<?>) entry.getValue();
+              return Graphs.Tag.of(pc.getName(), entry.getKey(), 
pc.getCoder());
             }})
           .toList();
     }

Reply via email to