[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-09-04 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214889097
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
 ##
 @@ -84,6 +85,11 @@ public Encoder create(final OutputStream outputStream) {
   return dummyEncoder;
 }
 
+@Override
+public Object getCoder() {
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-09-04 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214886675
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/MetricCollectTransform.java
 ##
 @@ -15,58 +15,51 @@
  */
 package edu.snu.nemo.common.ir.vertex.transform;
 
-import edu.snu.nemo.common.KeyExtractor;
-import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.ir.OutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.function.BiFunction;
 
 /**
- * A {@link Transform} relays input data from upstream vertex to downstream 
vertex promptly.
- * This transform can be used for merging input data into the {@link 
OutputCollector}.
- * @param  input/output type.
+ * A {@link Transform} that collects task-level statistics used for dynamic 
optimization.
+ * The collected statistics is sent to vertex with {@link 
AggregateMetricTransform} as a tagged output
+ * when this transform is closed.
+ *
+ * @param  input type.
+ * @param  output type.
  */
-public final class MetricCollectTransform implements Transform {
+public final class MetricCollectTransform implements Transform {
   private static final Logger LOG = 
LoggerFactory.getLogger(MetricCollectTransform.class.getName());
-  private OutputCollector outputCollector;
-  private final String dstvertexId;
-  private final KeyExtractor keyExtractor;
-  private Map dynOptData;
+  private OutputCollector outputCollector;
+  private O dynOptData;
+  private BiFunction dynOptDataCollector;
+  private BiFunction closer;
 
 Review comment:
   final?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214375796
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/coder/KVDecoderFactory.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A kv decoder factory object which generates kv decoders that decode byte 
streams to values of type {@code T}.
+ * To avoid generating instance-based coder such as Spark serializer for every 
decoding,
+ * user need to explicitly instantiate an kv decoder instance and use it.
+ *
+ * @param  element type.
+ */
+public interface KVDecoderFactory extends DecoderFactory {
+  @Override
+  KVDecoder create(InputStream inputStream) throws IOException;
+  DecoderFactory getKeyDecoderFactory();
+
+  /**
+   * Interface of the Decoder.
+   *
+   * @param  element type.
+   */
+  interface KVDecoder extends Decoder {
 
 Review comment:
   Why do we need this interface?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214377843
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/AggregationBarrierVertex.java
 ##
 @@ -15,55 +15,48 @@
  */
 package edu.snu.nemo.common.ir.vertex;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.exception.DynamicOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.transform.AggregateMetricTransform;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * IRVertex that collects statistics to send them to the optimizer for dynamic 
optimization.
  * This class is generated in the DAG through
  * 
{edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.DataSkewCompositePass}.
- * @param  type of the key of metric data.
- * @param  type of the value of metric data.
  */
-public final class MetricCollectionBarrierVertex extends IRVertex {
-  // Metric data used for dynamic optimization.
-  private Map metricData;
-  private final List blockIds;
-
+public final class AggregationBarrierVertex extends IRVertex {
 
 Review comment:
   It seems that this class is quite tied with `DataSkewCompositePass`. 
   If so, do you have any plan to cut the tie in another issue (like 
[NEMO-60](https://issues.apache.org/jira/browse/NEMO-60))? Please leave some 
TODOs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214385677
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
 ##
 @@ -99,6 +108,10 @@ OutputCollectorImpl getOutputCollector() {
 return additionalTagOutputChildren;
   }
 
+  public Map getTagToAdditionalChildrenId() {
 
 Review comment:
   Please add a comment for this public method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214379645
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionVertex.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex;
+
+import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.exception.DynamicOptimizationException;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.transform.MetricCollectTransform;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+
+/**
+ * IRVertex that collects statistics to send them to the optimizer for dynamic 
optimization.
+ * This class is generated in the DAG through
+ * 
{edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.DataSkewCompositePass}.
+ */
+public class MetricCollectionVertex extends IRVertex {
+  // This DAG snapshot is taken at the end of the DataSkewCompositePass, for 
the vertex to know the state of the DAG at
+  // its optimization, and to be able to figure out exactly where in the DAG 
the vertex exists.
+  private final Transform transform;
+  private final String dstVertexId;
 
 Review comment:
   What did you mean for `dstVertexId`? Why this vertex has to know the 
"destination vertex"? Does this vertex always has a single output edge?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214383091
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamKVEncoderFactory.java
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.beam.coder;
+
+import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.coder.KVEncoderFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VoidCoder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * {@link EncoderFactory} from {@link Coder}.
+ * @param  the type of element to encode.
+ */
+public final class BeamKVEncoderFactory implements KVEncoderFactory {
 
 Review comment:
   How about inheriting `BeamEncoderFactory`? It seems most of codes are 
duplicated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214376120
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/coder/KVDecoderFactory.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A kv decoder factory object which generates kv decoders that decode byte 
streams to values of type {@code T}.
+ * To avoid generating instance-based coder such as Spark serializer for every 
decoding,
+ * user need to explicitly instantiate an kv decoder instance and use it.
+ *
+ * @param  element type.
+ */
+public interface KVDecoderFactory extends DecoderFactory {
+  @Override
+  KVDecoder create(InputStream inputStream) throws IOException;
+  DecoderFactory getKeyDecoderFactory();
+
+  /**
+   * Interface of the Decoder.
+   *
+   * @param  element type.
+   */
+  interface KVDecoder extends Decoder {
+/**
+ * Decodes the given value onto the specified input stream.
+ * It has to be able to decode the given stream consequently by calling 
this method repeatedly.
+ * Because the user can want to keep a single input stream and 
continuously concatenate elements,
+ * the input stream should not be closed.
+ *
+ * @throws IOException if fail to decode
+ */
+T decode() throws IOException;
+  }
+
+  /**
+   * Dummy kv decoder factory.
+   */
+  KVDecoderFactory DUMMY_KVDECODER_FACTORY = new DummyKVDecoderFactory();
+
+  /**
+   * Dummy kv decoder factory implementation which is not supposed to be used.
+   */
+  final class DummyKVDecoderFactory implements KVDecoderFactory {
+private final DecoderFactory keyDecoderFactory = null;
 
 Review comment:
   How about use dummy decoder factory instead of `null`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214381215
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
 ##
 @@ -60,6 +67,55 @@ private EmptyComponents() {
 return dagBuilder.build();
   }
 
+  public static DAG buildEmptyDAGForSkew() {
 
 Review comment:
   Please add a comment for this public method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214380077
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/AggregationBarrierVertex.java
 ##
 @@ -15,55 +15,48 @@
  */
 package edu.snu.nemo.common.ir.vertex;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.exception.DynamicOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.transform.AggregateMetricTransform;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * IRVertex that collects statistics to send them to the optimizer for dynamic 
optimization.
  * This class is generated in the DAG through
  * 
{edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.DataSkewCompositePass}.
- * @param  type of the key of metric data.
- * @param  type of the value of metric data.
  */
-public final class MetricCollectionBarrierVertex extends IRVertex {
-  // Metric data used for dynamic optimization.
-  private Map metricData;
-  private final List blockIds;
-
+public final class AggregationBarrierVertex extends IRVertex {
   // This DAG snapshot is taken at the end of the DataSkewCompositePass, for 
the vertex to know the state of the DAG at
   // its optimization, and to be able to figure out exactly where in the DAG 
the vertex exists.
+  private final Transform transform;
   private DAG dagSnapshot;
 
 Review comment:
   Why do we need this snapshot? If it is not needed, please remove it.
   If it is needed, let's think about another way. It seems that having a 
snapshot of IRDAG in an `IRVertex` is not a good design.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214386573
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionVertex.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex;
+
+import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.exception.DynamicOptimizationException;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.transform.MetricCollectTransform;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+
+/**
+ * IRVertex that collects statistics to send them to the optimizer for dynamic 
optimization.
+ * This class is generated in the DAG through
+ * 
{edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.DataSkewCompositePass}.
+ */
+public class MetricCollectionVertex extends IRVertex {
+  // This DAG snapshot is taken at the end of the DataSkewCompositePass, for 
the vertex to know the state of the DAG at
+  // its optimization, and to be able to figure out exactly where in the DAG 
the vertex exists.
+  private final Transform transform;
+  private final String dstVertexId;
 
 Review comment:
   Why don't we extend `OperatorVertex`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214380685
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/MetricCollectTransform.java
 ##
 @@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.transform;
+
+import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.ir.OutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link Transform} relays input data from upstream vertex to downstream 
vertex promptly.
+ * This transform can be used for merging input data into the {@link 
OutputCollector}.
+ * @param  input/output type.
+ */
+public final class MetricCollectTransform implements Transform {
 
 Review comment:
   It seems that this class is quite tied with `DataSkewCompositePass`. 
   If so, do you have any plan to cut the tie in another issue (like 
[NEMO-60](https://issues.apache.org/jira/browse/NEMO-60))? Please leave some 
TODOs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214386899
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 ##
 @@ -25,22 +28,30 @@
  * @param  output type.
  */
 public final class OutputCollectorImpl implements OutputCollector {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OutputCollectorImpl.class.getName());
   private final Set mainTagOutputChildren;
   // Use ArrayList (not Queue) to allow 'null' values
   private final ArrayList mainTagElements;
-  private final Map> additionalTagElementsMap;
+  // Key: Pair of tag and destination vertex id
+  // Value: data elements which will be input to the tagged destination vertex
+  private final Map, ArrayList> 
additionalTaggedChildToElementsMap;
 
   /**
* Constructor of a new OutputCollectorImpl with tagged outputs.
* @param mainChildren   main children vertices
-   * @param taggedChildren additional children vertices
+   * @param tagToChildren additional children vertices
*/
   public OutputCollectorImpl(final Set mainChildren,
- final List taggedChildren) {
+ final Map tagToChildren) {
 this.mainTagOutputChildren = mainChildren;
 this.mainTagElements = new ArrayList<>(1);
-this.additionalTagElementsMap = new HashMap<>();
-taggedChildren.forEach(child -> this.additionalTagElementsMap.put(child, 
new ArrayList<>(1)));
+this.additionalTaggedChildToElementsMap = new HashMap<>();
+tagToChildren.forEach((tag, child) ->
+  this.additionalTaggedChildToElementsMap.put(Pair.of(tag, child), new 
ArrayList<>(1)));
+  }
+
+  public Map, ArrayList> 
getAdditionalTaggedChildToElementsMap() {
 
 Review comment:
   Please add a comment for this public method


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214387852
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
 ##
 @@ -310,21 +320,59 @@ private void doExecute() {
 }
   }
 
+  public void sendDynOptData(final Object element) {
 
 Review comment:
   Please add a comment for this public method


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214376305
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/coder/KVEncoderFactory.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A kv encoder factory object which generates kv encoders that encode values 
of type {@code T} into byte streams.
+ * To avoid to generate instance-based coder such as Spark serializer for 
every encoding,
+ * user need to explicitly instantiate an kv encoder instance and use it.
+ *
+ * @param  element type.
+ */
+public interface KVEncoderFactory extends EncoderFactory {
+  @Override
+  KVEncoder create(OutputStream outputStream) throws IOException;
+  EncoderFactory getKeyEncoderFactory();
+
+  /**
+   * Interface of the Encoder.
+   *
+   * @param  element type.
+   */
+  interface KVEncoder extends Encoder {
+/**
+ * Encodes the given value onto the specified output stream.
+ * It has to be able to encode the given stream consequently by calling 
this method repeatedly.
+ * Because the user may want to keep a single output stream and 
continuously concatenate elements,
+ * the output stream should not be closed.
+ *
+ * @param element the element to be encoded
+ * @throws IOException if fail to encode
+ */
+void encode(T element) throws IOException;
+  }
+
+  /**
+   * Dummy kv encoder factory.
+   */
+  KVEncoderFactory DUMMY_KVENCODER_FACTORY = new DummyKVEncoderFactory();
+
+  /**
+   * Dummy kv encoder factory implementation which is not supposed to be used.
+   */
+  final class DummyKVEncoderFactory implements KVEncoderFactory {
+private final EncoderFactory keyEncoderFactory = null;
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214375049
 
 

 ##
 File path: common/pom.xml
 ##
 @@ -52,5 +52,10 @@ limitations under the License.
 ${hadoop.version}
 provided
 
+
+org.apache.beam
+beam-sdks-java-core
 
 Review comment:
   Why is this needed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214387359
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 ##
 @@ -55,14 +66,20 @@ public void emit(final O output) {
   emit((O) output);
 } else {
   // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
-  final List dataElements = 
this.additionalTagElementsMap.get(dstVertexId);
-  if (dataElements == null) {
-throw new IllegalArgumentException("Wrong destination vertex id 
passed!");
-  }
+  final List dataElements = 
getAdditionalTaggedDataFromDstVertexId(dstVertexId);
   dataElements.add(output);
 }
   }
 
+  public void printLog() {
 
 Review comment:
   The method title seems too general.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214382513
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamKVDecoderFactory.java
 ##
 @@ -0,0 +1,165 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.beam.coder;
+
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.KVDecoderFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link DecoderFactory} from {@link org.apache.beam.sdk.coders.Coder}.
+ * @param  the type of element to decode.
+ */
+/**
+ * {@link DecoderFactory} from {@link org.apache.beam.sdk.coders.Coder}.
+ * @param  the type of element to decode.
+ */
+public final class BeamKVDecoderFactory implements KVDecoderFactory {
 
 Review comment:
   Please update the comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214383751
 
 

 ##
 File path: 
compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
 ##
 @@ -22,8 +22,6 @@
 
 /**
  * Pass to modify the DAG for a job to perform data skew.
- * It adds a {@link 
edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex} before Shuffle 
edges,
- * to make a barrier before it, and to use the metrics to repartition the 
skewed data.
  * NOTE: we currently put the SkewCompositePass at the end of the list for 
each policies, as it needs to take a
 
 Review comment:
   Please update the description (rather than just removing it).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214376259
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/coder/KVEncoderFactory.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A kv encoder factory object which generates kv encoders that encode values 
of type {@code T} into byte streams.
+ * To avoid to generate instance-based coder such as Spark serializer for 
every encoding,
+ * user need to explicitly instantiate an kv encoder instance and use it.
+ *
+ * @param  element type.
+ */
+public interface KVEncoderFactory extends EncoderFactory {
+  @Override
+  KVEncoder create(OutputStream outputStream) throws IOException;
+  EncoderFactory getKeyEncoderFactory();
+
+  /**
+   * Interface of the Encoder.
+   *
+   * @param  element type.
+   */
+  interface KVEncoder extends Encoder {
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214378240
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionVertex.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex;
+
+import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.exception.DynamicOptimizationException;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.transform.MetricCollectTransform;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+
+/**
+ * IRVertex that collects statistics to send them to the optimizer for dynamic 
optimization.
+ * This class is generated in the DAG through
+ * 
{edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.DataSkewCompositePass}.
 
 Review comment:
   Please differentiate this class with `AggregationBarrierVertex` from the 
comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-08-31 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214386600
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.transform;
+
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.ir.OutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * A {@link Transform} relays input data from upstream vertex to downstream 
vertex promptly.
+ * This transform can be used for merging input data into the {@link 
OutputCollector}.
+ * @param  input type.
+ * @param  output type.
+ */
+public final class AggregateMetricTransform implements Transform {
 
 Review comment:
   Why don't we extend `OperatorVertex`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services