[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214384789
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
 ##
 @@ -429,8 +429,16 @@ private StageEdge getEdgeToOptimize(final String taskId) {
   .findFirst()
   .orElseThrow(() -> new RuntimeException());
 
+final List parentStages = 
planStateManager.getPhysicalPlan().getStageDAG()
+  .getParents(stagePutOnHold.getId());
+
+if (parentStages.size() > 1) {
 
 Review comment:
   Why is this assumption necessary? Maybe add a TODO?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214380969
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/MetricCollectTransform.java
 ##
 @@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.transform;
+
+import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.ir.OutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link Transform} relays input data from upstream vertex to downstream 
vertex promptly.
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214379450
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/AggregationBarrierVertex.java
 ##
 @@ -15,55 +15,48 @@
  */
 package edu.snu.nemo.common.ir.vertex;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.exception.DynamicOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.transform.AggregateMetricTransform;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * IRVertex that collects statistics to send them to the optimizer for dynamic 
optimization.
  * This class is generated in the DAG through
  * 
{edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.DataSkewCompositePass}.
- * @param  type of the key of metric data.
- * @param  type of the value of metric data.
  */
-public final class MetricCollectionBarrierVertex extends IRVertex {
-  // Metric data used for dynamic optimization.
-  private Map metricData;
-  private final List blockIds;
-
+public final class AggregationBarrierVertex extends IRVertex {
 
 Review comment:
   Add class-level comments about the 'barrier' feature this vertex provides?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214380320
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.transform;
+
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.ir.OutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * A {@link Transform} relays input data from upstream vertex to downstream 
vertex promptly.
+ * This transform can be used for merging input data into the {@link 
OutputCollector}.
+ * @param  input type.
+ * @param  output type.
+ */
+public final class AggregateMetricTransform implements Transform {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AggregateMetricTransform.class.getName());
+  private OutputCollector outputCollector;
+  private O aggregatedDynOptData;
+
+  /**
+   * Default constructor.
+   */
+  public AggregateMetricTransform(final O aggregatedDynOptData) {
+this.aggregatedDynOptData = aggregatedDynOptData;
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector oc) {
+this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final I element) {
+// Aggregate key frequency data.
+Object key = ((Pair) element).left();
 
 Review comment:
   final?
   (and also the other local variables)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214381290
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamKVDecoderFactory.java
 ##
 @@ -0,0 +1,165 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.beam.coder;
+
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.KVDecoderFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link DecoderFactory} from {@link org.apache.beam.sdk.coders.Coder}.
 
 Review comment:
   Duplicate comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214384524
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/coder/KVEncoderFactory.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A kv encoder factory object which generates kv encoders that encode values 
of type {@code T} into byte streams.
+ * To avoid to generate instance-based coder such as Spark serializer for 
every encoding,
+ * user need to explicitly instantiate an kv encoder instance and use it.
+ *
+ * @param  element type.
+ */
+public interface KVEncoderFactory extends EncoderFactory {
 
 Review comment:
   I feel this adds a lot of boilerplate code.
   
   My understanding this that this PR takes this approach
   ```
   beamKvCoder -> new NemoKVEncoderFactory(beamKvCoder.getKeyCoder(), 
beamKvCoder.getValueCoder())
   ```
   
   such that in the IR we can access the key and value coders while bypassing 
Beam-specific code.
   
   What do you think about this approach? This also bypasses Beam-specific 
code, perhaps with less code.
   
   ```
   NemoKeyCoderExtractor -> beamKvCoder.getKeyCoder() OR 
rddTupleCoder.getKeyCoder()
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214386640
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
 ##
 @@ -231,27 +235,33 @@ public TaskExecutor(final Task task,
   private void processElementRecursively(final VertexHarness vertexHarness, 
final Object dataElement) {
 final IRVertex irVertex = vertexHarness.getIRVertex();
 final OutputCollectorImpl outputCollector = 
vertexHarness.getOutputCollector();
+
 if (irVertex instanceof SourceVertex) {
   outputCollector.emit(dataElement);
 } else if (irVertex instanceof OperatorVertex) {
   final Transform transform = ((OperatorVertex) irVertex).getTransform();
   transform.onData(dataElement);
-} else if (irVertex instanceof MetricCollectionBarrierVertex) {
-  outputCollector.emit(dataElement);
-  setIRVertexPutOnHold((MetricCollectionBarrierVertex) irVertex);
+} else if (irVertex instanceof MetricCollectionVertex) {
+  final Transform transform = ((MetricCollectionVertex) 
irVertex).getTransform();
+  transform.onData(dataElement);
 
 Review comment:
   MetricCollectionVertex probably should be an OperatorVertex.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214387288
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 ##
 @@ -55,14 +66,20 @@ public void emit(final O output) {
   emit((O) output);
 } else {
   // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-  final List dataElements = 
this.additionalTagElementsMap.get(dstVertexId);
-  if (dataElements == null) {
-throw new IllegalArgumentException("Wrong destination vertex id 
passed!");
-  }
+  final List dataElements = 
getAdditionalTaggedDataFromDstVertexId(dstVertexId);
   dataElements.add(output);
 }
   }
 
+  public void printLog() {
 
 Review comment:
   Unused method


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214379961
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionVertex.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex;
+
+import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.exception.DynamicOptimizationException;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.transform.MetricCollectTransform;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+
+/**
+ * IRVertex that collects statistics to send them to the optimizer for dynamic 
optimization.
+ * This class is generated in the DAG through
+ * 
{edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.DataSkewCompositePass}.
+ */
+public class MetricCollectionVertex extends IRVertex {
 
 Review comment:
   extends OperatorVertex


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214380246
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.transform;
+
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.ir.OutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * A {@link Transform} relays input data from upstream vertex to downstream 
vertex promptly.
 
 Review comment:
   This seems to describe the relay vertex?
   Can you briefly explain here what/how this vertex 'aggregates'?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214387505
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
 ##
 @@ -278,28 +274,6 @@ public void writeBlock(final Block block,
 .setType(ControlMessage.MessageType.BlockStateChanged)
 .setBlockStateChangedMsg(blockStateChangedMsgBuilder.build())
 .build());
-
-if (reportPartitionSizes) {
 
 Review comment:
   This is great! :smile: 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
johnyangk commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214387375
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 ##
 @@ -55,14 +66,20 @@ public void emit(final O output) {
   emit((O) output);
 } else {
   // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-  final List dataElements = 
this.additionalTagElementsMap.get(dstVertexId);
-  if (dataElements == null) {
 
 Review comment:
   Why remove this checker?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services