sanha commented on a change in pull request #193: [NEMO-338] SkewSamplingPass
URL: https://github.com/apache/incubator-nemo/pull/193#discussion_r257972283
 
 

 ##########
 File path: common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
 ##########
 @@ -151,73 +176,153 @@ public void insert(final StreamVertex streamVertex, 
final IREdge edgeToStreamize
    *        shuffleEdge - messageAggregatorVertex - broadcastEdge - dst
    * (the "Before" relationships are unmodified)
    *
+   * This preserves semantics as the results of the inserted message vertices 
are never consumed by the original IRDAG.
+   *
    * @param messageBarrierVertex to insert.
    * @param messageAggregatorVertex to insert.
    * @param mbvOutputEncoder to use.
    * @param mbvOutputDecoder to use.
    * @param edgesToGetStatisticsOf to examine.
+   * @param edgesToOptimize to optimize.
    */
   public void insert(final MessageBarrierVertex messageBarrierVertex,
                      final MessageAggregatorVertex messageAggregatorVertex,
                      final EncoderProperty mbvOutputEncoder,
                      final DecoderProperty mbvOutputDecoder,
-                     final Set<IREdge> edgesToGetStatisticsOf) {
-    if (edgesToGetStatisticsOf.stream().map(edge -> 
edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) {
-      throw new IllegalArgumentException("Not destined to the same vertex: " + 
edgesToGetStatisticsOf.toString());
-    }
-    final IRVertex dst = edgesToGetStatisticsOf.iterator().next().getDst();
-
+                     final Set<IREdge> edgesToGetStatisticsOf,
+                     final Set<IREdge> edgesToOptimize) {
     // Create a completely new DAG with the vertex inserted.
-    final DAGBuilder builder = new DAGBuilder();
+    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
-    // Current metric collection id.
-    final int currentMetricCollectionId = metricCollectionId.incrementAndGet();
+    // All of the existing vertices and edges remain intact
+    modifiedDAG.topologicalDo(v -> {
+      builder.addVertex(v);
+      modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices);
+    });
 
-    // First, add all the vertices.
-    modifiedDAG.topologicalDo(v -> builder.addVertex(v));
+    ////////////////////////////////// STEP 1: Insert new vertices and edges
 
+    // From mav to dst
     // Add a control dependency (no output) from the messageAggregatorVertex 
to the destination.
     builder.addVertex(messageAggregatorVertex);
-    final IREdge noDataEdge = new 
IREdge(CommunicationPatternProperty.Value.BroadCast, messageAggregatorVertex, 
dst);
-    builder.connectVertices(noDataEdge);
+    final IRVertex dst = edgesToGetStatisticsOf.iterator().next().getDst();
+    builder.connectVertices(Util.createControlEdge(messageAggregatorVertex, 
dst));
+
+    // Build the edges: src - mbv - mav
+    for (final IREdge edge : edgesToGetStatisticsOf) {
+      final MessageBarrierVertex mbv = new 
MessageBarrierVertex<>(messageBarrierVertex.getMessageFunction());
+      builder.addVertex(mbv);
+
+      // From src to mbv
+      final IREdge clone = 
Util.cloneEdge(CommunicationPatternProperty.Value.OneToOne, edge, 
edge.getSrc(), mbv);
+      builder.connectVertices(clone);
+
+      // From mbv to mav
+      final IREdge edgeToABV = edgeBetweenMessageVertices(
+        mbv, messageAggregatorVertex, mbvOutputEncoder, mbvOutputDecoder);
+      builder.connectVertices(edgeToABV);
+    }
 
-    // Add the edges and the messageBarrierVertex.
+    ////////////////////////////////// STEP 2: Annotate the MessageId on 
optimization target edges
+
+    if (edgesToOptimize.stream().map(edge -> 
edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) {
+      throw new IllegalArgumentException("Not destined to the same vertex: " + 
edgesToOptimize.toString());
+    }
     modifiedDAG.topologicalDo(v -> {
-      for (final IREdge edge : modifiedDAG.getIncomingEdgesOf(v)) {
-        if (edgesToGetStatisticsOf.contains(edge)) {
-          // MATCH!
-          final MessageBarrierVertex mbv = new 
MessageBarrierVertex<>(messageBarrierVertex.getMessageFunction());
-          builder.addVertex(mbv);
-
-          // Clone the edgeToGetStatisticsOf
-          final IREdge clone = new 
IREdge(CommunicationPatternProperty.Value.OneToOne, edge.getSrc(), mbv);
-          
clone.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
-          
clone.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
-          
edge.getPropertyValue(AdditionalOutputTagProperty.class).ifPresent(tag -> {
-            clone.setProperty(AdditionalOutputTagProperty.of(tag));
-          });
-          builder.connectVertices(clone);
-
-          // messageBarrierVertex to the messageAggregatorVertex
-          final IREdge edgeToABV = edgeBetweenMessageVertices(mbv,
-            messageAggregatorVertex, mbvOutputEncoder, mbvOutputDecoder, 
currentMetricCollectionId);
-          builder.connectVertices(edgeToABV);
-
-          // The original edge
-          // We then insert the vertex with MessageBarrierTransform and vertex 
with MessageAggregatorTransform
-          // between the vertex and incoming vertices.
-          final IREdge edgeToOriginalDst =
-            new 
IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), 
edge.getSrc(), v);
-          edge.copyExecutionPropertiesTo(edgeToOriginalDst);
-          
edgeToOriginalDst.setPropertyPermanently(MessageIdProperty.of(currentMetricCollectionId));
-          builder.connectVertices(edgeToOriginalDst);
-        } else {
-          // NO MATCH, so simply connect vertices as before.
-          builder.connectVertices(edge);
+      modifiedDAG.getIncomingEdgesOf(v).forEach(inEdge -> {
+        if (edgesToOptimize.contains(inEdge)) {
+          
inEdge.setPropertyPermanently(MessageIdProperty.of(messageAggregatorVertex.getMessageId()));
         }
-      }
+      });
+    });
+
+    modifiedDAG = builder.build(); // update the DAG.
+  }
+
+  /**
+   * Inserts a set of samplingVertices that process sampled data.
+   *
+   * This method automatically inserts the following three types of edges.
+   * (1) Edges between samplingVertices to reflect the original relationship
+   * (2) Edges from the original IRDAG to samplingVertices that clone the 
inEdges of the original vertices
+   * (3) Edges from the samplingVertices to the original IRDAG to respect 
executeAfterSamplingVertices
+   *
+   * Suppose the caller supplies the following arguments to perform a "sampled 
run" of vertices {V1, V2},
+   * prior to executing them.
+   * - samplingVertices: {V1', V2'}
+   * - childrenOfSamplingVertices: {V1}
+   *
+   * Before: V1 - oneToOneEdge - V2 - shuffleEdge - V3
+   * After: V1' - oneToOneEdge - V2' - controlEdge - V1 - oneToOneEdge - V2 - 
shuffleEdge - V3
+   *
+   * This preserves semantics as the original IRDAG remains unchanged and 
unaffected.
+   *
+   * @param samplingVertices to insert.
+   * @param executeAfterSamplingVertices that must be executed after 
samplingVertices.
+   */
+  public void insert(final Set<SamplingVertex> samplingVertices,
 
 Review comment:
   Why don't we get the `Set` of the original vertices to sample instead of 
already sampled vertices?
   If we receive the sampled vertices, opt pass builders can give some already 
connected sampling vertices.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to