[GitHub] sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-09-04 Thread GitBox
sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use 
MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index f41146b64..f3caf7dd4 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -157,9 +157,9 @@ def dot(self):
 label += '{}:{}'.format(transform_name, class_name)
 except:
 pass
-if ('class' in self.properties and self.properties['class'] == 
'MetricCollectionBarrierVertex'):
+if ('class' in self.properties and self.properties['class'] == 
'AggregationBarrierVertex'):
 shape = ', shape=box'
-label += 'MetricCollectionBarrier'
+label += 'AggregationBarrier'
 else:
 shape = ''
 try:
diff --git a/common/pom.xml b/common/pom.xml
index da5a48c80..18ef10b02 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -52,5 +52,11 @@ limitations under the License.
 ${hadoop.version}
 provided
 
+
+  org.apache.beam
+  beam-sdks-java-core
+  ${beam.version}
+
+
 
 
diff --git a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java 
b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
index be7cf592c..23bc6abd3 100644
--- a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
+++ b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
@@ -24,6 +24,7 @@
 public interface KeyExtractor extends Serializable {
   /**
* Extracts key.
+   *
* @param element Element to get the key from.
* @return The extracted key of the element.
*/
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
index dc67ff355..16fa877c5 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -20,8 +20,8 @@
 import java.io.Serializable;
 
 /**
- * A decoder factory object which generates decoders that decode values of 
type {@code T} into byte streams.
- * To avoid to generate instance-based coder such as Spark serializer for 
every decoding,
+ * A decoder factory object which generates decoders that decode byte streams 
into values of type {@code T}.
+ * To avoid generating instance-based coder such as Spark serializer for every 
decoding,
  * user need to instantiate a decoder instance and use it.
  *
  * @param  element type.
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
index d63fafb9c..82c3730c0 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
@@ -46,7 +46,7 @@
 
 /**
  * Encodes the given value onto the specified output stream.
- * It have to be able to encode the given stream consequently by calling 
this method repeatedly.
+ * It has to be able to encode the given stream consequently by calling 
this method repeatedly.
  * Because the user can want to keep a single output stream and 
continuously concatenate elements,
  * the output stream should not be closed.
  *
diff --git 
a/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
new file mode 100644
index 0..4335413a4
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link DecoderFactory} which is used for long.
+ */
+public final class LongDecoderFactory implements DecoderFactory {
+
+  private static final LongDecoderFactory LONG_DECODER_FACTORY = new 
LongDecoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private LongDecoderFactory() {
+ 

[GitHub] sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-09-04 Thread GitBox
sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use 
MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index f41146b64..f3caf7dd4 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -157,9 +157,9 @@ def dot(self):
 label += '{}:{}'.format(transform_name, class_name)
 except:
 pass
-if ('class' in self.properties and self.properties['class'] == 
'MetricCollectionBarrierVertex'):
+if ('class' in self.properties and self.properties['class'] == 
'AggregationBarrierVertex'):
 shape = ', shape=box'
-label += 'MetricCollectionBarrier'
+label += 'AggregationBarrier'
 else:
 shape = ''
 try:
diff --git a/common/pom.xml b/common/pom.xml
index da5a48c80..18ef10b02 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -52,5 +52,11 @@ limitations under the License.
 ${hadoop.version}
 provided
 
+
+  org.apache.beam
+  beam-sdks-java-core
+  ${beam.version}
+
+
 
 
diff --git a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java 
b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
index be7cf592c..23bc6abd3 100644
--- a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
+++ b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
@@ -24,6 +24,7 @@
 public interface KeyExtractor extends Serializable {
   /**
* Extracts key.
+   *
* @param element Element to get the key from.
* @return The extracted key of the element.
*/
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
index dc67ff355..16fa877c5 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -20,8 +20,8 @@
 import java.io.Serializable;
 
 /**
- * A decoder factory object which generates decoders that decode values of 
type {@code T} into byte streams.
- * To avoid to generate instance-based coder such as Spark serializer for 
every decoding,
+ * A decoder factory object which generates decoders that decode byte streams 
into values of type {@code T}.
+ * To avoid generating instance-based coder such as Spark serializer for every 
decoding,
  * user need to instantiate a decoder instance and use it.
  *
  * @param  element type.
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
index d63fafb9c..82c3730c0 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
@@ -46,7 +46,7 @@
 
 /**
  * Encodes the given value onto the specified output stream.
- * It have to be able to encode the given stream consequently by calling 
this method repeatedly.
+ * It has to be able to encode the given stream consequently by calling 
this method repeatedly.
  * Because the user can want to keep a single output stream and 
continuously concatenate elements,
  * the output stream should not be closed.
  *
diff --git 
a/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java 
b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
new file mode 100644
index 0..4335413a4
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link DecoderFactory} which is used for long.
+ */
+public final class LongDecoderFactory implements DecoderFactory {
+
+  private static final LongDecoderFactory LONG_DECODER_FACTORY = new 
LongDecoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private LongDecoderFactory() {
+ 

[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-09-04 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214889097
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
 ##
 @@ -84,6 +85,11 @@ public Encoder create(final OutputStream outputStream) {
   return dummyEncoder;
 }
 
+@Override
+public Object getCoder() {
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

2018-09-04 Thread GitBox
sanha commented on a change in pull request #115: [NEMO-96] Modularize 
DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115#discussion_r214886675
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/MetricCollectTransform.java
 ##
 @@ -15,58 +15,51 @@
  */
 package edu.snu.nemo.common.ir.vertex.transform;
 
-import edu.snu.nemo.common.KeyExtractor;
-import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.ir.OutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.function.BiFunction;
 
 /**
- * A {@link Transform} relays input data from upstream vertex to downstream 
vertex promptly.
- * This transform can be used for merging input data into the {@link 
OutputCollector}.
- * @param  input/output type.
+ * A {@link Transform} that collects task-level statistics used for dynamic 
optimization.
+ * The collected statistics is sent to vertex with {@link 
AggregateMetricTransform} as a tagged output
+ * when this transform is closed.
+ *
+ * @param  input type.
+ * @param  output type.
  */
-public final class MetricCollectTransform implements Transform {
+public final class MetricCollectTransform implements Transform {
   private static final Logger LOG = 
LoggerFactory.getLogger(MetricCollectTransform.class.getName());
-  private OutputCollector outputCollector;
-  private final String dstvertexId;
-  private final KeyExtractor keyExtractor;
-  private Map dynOptData;
+  private OutputCollector outputCollector;
+  private O dynOptData;
+  private BiFunction dynOptDataCollector;
+  private BiFunction closer;
 
 Review comment:
   final?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services