johnyangk commented on a change in pull request #193: [NEMO-338] 
SkewSamplingPass
URL: https://github.com/apache/incubator-nemo/pull/193#discussion_r258359366
 
 

 ##########
 File path: 
compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SamplingSkewReshapingPass.java
 ##########
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping;
+
+import org.apache.nemo.common.KeyExtractor;
+import org.apache.nemo.common.dag.Edge;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.utility.MessageAggregatorVertex;
+import org.apache.nemo.common.ir.vertex.utility.MessageBarrierVertex;
+import org.apache.nemo.common.ir.vertex.utility.SamplingVertex;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Optimizes the PartitionSet property of shuffle edges to handle data skews 
using the SamplingVertex.
+ *
+ * This pass effectively partitions the IRDAG by non-oneToOne edges, clones 
each subDAG partition using SamplingVertex
+ * to process sampled data, and executes each cloned partition prior to 
executing the corresponding original partition.
+ *
+ * Suppose the IRDAG is partitioned into two sub-DAGs as follows:
+ * P1 - P2
+ *
+ * Then, this pass will produce something like:
+ * P1' - P1 - P2
+ *          - P2' - P2
+ * where Px' consists of SamplingVertex objects that clone the execution of Px.
+ *
+ * For each Px' this pass also inserts a MessageBarrierVertex, to use its data 
statistics for dynamically optimizing
+ * the execution behaviors of Px.
+ */
+@Requires(CommunicationPatternProperty.class)
+public final class SamplingSkewReshapingPass extends ReshapingPass {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamplingSkewReshapingPass.class.getName());
+  private static final float SAMPLE_RATE = 0.1f;
+
+  /**
+   * Default constructor.
+   */
+  public SamplingSkewReshapingPass() {
+    super(SamplingSkewReshapingPass.class);
+  }
+
+  @Override
+  public IRDAG apply(final IRDAG dag) {
+    dag.topologicalDo(v -> {
+      for (final IREdge e : dag.getIncomingEdgesOf(v)) {
+        if (CommunicationPatternProperty.Value.Shuffle.equals(
+          e.getPropertyValue(CommunicationPatternProperty.class).get())) {
+          // Compute the partition and its source vertices
+          final IRVertex shuffleWriter = e.getSrc();
+          final Set<IRVertex> partitionAll = 
recursivelyBuildPartition(shuffleWriter, dag);
+          final Set<IRVertex> partitionSources = 
partitionAll.stream().filter(vertexInPartition ->
+            !dag.getIncomingEdgesOf(vertexInPartition).stream()
+              .map(Edge::getSrc)
+              .anyMatch(partitionAll::contains)
+          ).collect(Collectors.toSet());
+
+          // Insert sampling vertices.
+          final Set<SamplingVertex> samplingVertices = partitionAll
+            .stream()
+            .map(vertexInPartition -> new SamplingVertex(vertexInPartition, 
SAMPLE_RATE))
+            .collect(Collectors.toSet());
+          dag.insert(samplingVertices, partitionSources);
+
+          // Insert the message vertex.
+          // We first obtain a clonedShuffleEdge to analyze the data 
statistics of the shuffle outputs of
+          // the sampling vertex right before shuffle.
+          final SamplingVertex rightBeforeShuffle = samplingVertices.stream()
+            .filter(sv -> sv.getOriginalVertex().equals(e.getSrc()))
+            .findFirst()
+            .orElseThrow(() -> new IllegalStateException());
+          final IREdge clonedShuffleEdge = 
rightBeforeShuffle.getCloneOfOriginalEdge(e);
+
+          final KeyExtractor keyExtractor = 
e.getPropertyValue(KeyExtractorProperty.class).get();
 
 Review comment:
   Thanks for bringing this up!
   
   (1) Regarding pipelining MessageBarrierVertex within a single stage with 
parent sampling vertices:
   
   I've changed the semantics of insert() to use SamplingVertex(NewVertex) 
instead of the NewVertex, if an existing vertex that the NewVertex will connect 
to is a SamplingVertex. I think this is a reasonable assumption as new vertices 
that consume outputs from sampling vertices will process a subset of data 
anyways, and no such new vertex will  reach the original DAG except via control 
edges. With this change Nemo is able to pipeline the MessageBarrierVertex 
(wrapped inside a SamplingVertex), avoiding duplicate data materialization.
   
   (2) Regarding connecting the message aggregation vertex to the partition 
Sources:
   
   I'd prefer not to do this, at least considering current use cases we have. 
   
   Here's the physical DAG diagram of 
PerKeyMedianITCase#testLargeShuffleSamplingSkew  (including the fix for (1))
   
https://nemo.snuspl.snu.ac.kr:50443/nemo-dag-out/7a1c136ac24f427ebb4c34a43712da3f.svg
   
   In the diagram the ScheduleGroup property is set such that the sampling 
partition does always execute prior to the original partition. In particular 
the ordering ScheduleGroup0(Stage3+Stage5) ==> ScheduleGroup1(Stage4+Stage6) is 
enforced although Stage4 ==> Stage6 is PUSH (which makes sense when considering 
each schedule group as a big vertex). The sampling should also happen prior to 
the execution of the original partition when Stage4 ==> Stage6 is PULL as well, 
although the schedule groups may differ in this case. I think this shows that a 
sequence of insert(samplingVertex) and insert(messageVertex) captures our 
intention fairly well.
   
   I did write some code to try to 'extend' the control edges from (sampling 
vertices) to (existing vertices), by adding new control edges from (new 
vertices that connect to sampling vertices) to (existing vertices) upon each 
insert(). However, I ultimately I felt that this approach complicates the code 
quite a bit, and reverted the code back to the current approach which I think 
works for the current use cases.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to