[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0

2018-08-10 Thread GitBox
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam 
version to 2.5.0
URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r209167886
 
 

 ##
 File path: 
examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java
 ##
 @@ -71,13 +69,4 @@ public void test() throws Exception {
 
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
 .build());
   }
-
-  @Test (timeout = TIMEOUT)
-  public void testSailfish() throws Exception {
 
 Review comment:
   I will re-enable the large shuffle policy tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0

2018-08-10 Thread GitBox
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam 
version to 2.5.0
URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r209167702
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 ##
 @@ -74,12 +77,24 @@ public void clearMain() {
   }
 
   public void clearTag(final String tag) {
-if (this.additionalTagElementsMap.get(tag) == null) {
+if (this.mainTagOutputChildren.contains(tag)) {
   // This dstVertexId is for the main tag
   clearMain();
 } else {
   // Note that String#hashCode() can be cached, thus accessing additional 
output queues can be fast.
   this.additionalTagElementsMap.get(tag).clear();
 }
   }
+
+  public List getMainTagOutputQueue() {
+return mainTagElements;
+  }
+
+  public List getAdditionalTagOutputQueue(final String dstVertexId) {
+if (this.mainTagOutputChildren.contains(dstVertexId)) {
+  return this.mainTagElements;
+} else {
+  return this.additionalTagElementsMap.get(dstVertexId);
 
 Review comment:
   I will make these methods explicitly throw exceptions when wrong vertex id 
is passed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0

2018-08-10 Thread GitBox
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam 
version to 2.5.0
URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r209166646
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamKeyExtractor.java
 ##
 @@ -26,7 +26,8 @@
   @Override
   public Object extractKey(final Object element) {
 if (element instanceof KV) {
-  return ((KV) element).getKey();
+  final Object key = ((KV) element).getKey();
+  return key == null ? 0 : key;
 
 Review comment:
   Thank you, I will leave a comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0

2018-08-07 Thread GitBox
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam 
version to 2.5.0
URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208424286
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
 ##
 @@ -117,4 +117,16 @@ public boolean isEmpty(final String tag) {
   return this.additionalTagOutputQueues.get(tag).isEmpty();
 }
   }
+
+  public Queue getMainTagOutputQueue() {
+return mainTagOutputQueue;
+  }
+
+  public Queue getAdditionalTagOutputQueue(final String dstVertexId) {
 
 Review comment:
   This is invoked for every element. This is a newly introduced way of 
implementing `processElement` with additional output tags. Detailed example can 
be found 
[here](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L219).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0

2018-08-07 Thread GitBox
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam 
version to 2.5.0
URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208206155
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
 ##
 @@ -52,8 +56,11 @@ public void prepare(final Context context, final 
OutputCollector oc) {
 
   @Override
   public void onData(final I element) {
-WindowedValue data = WindowedValue.valueInGlobalWindow(element);
-windowed.add(data);
+// Since CreateViewTransform takes KV(Void, value), this is okay
+if (element instanceof KV) {
+  final KV kv = (KV) element;
+  multiView.getDataList().add(kv.getValue());
+}
 
 Review comment:
   For handling sideInputs, I will look carefully again and see if I can make 
it more concise. Thanks for the comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0

2018-08-07 Thread GitBox
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam 
version to 2.5.0
URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208205871
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
 ##
 @@ -249,7 +252,7 @@ public Instant timestamp() {
 
 @Override
 public PaneInfo pane() {
-  throw new UnsupportedOperationException("pane() in ProcessContext under 
DoTransform");
+  return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN);
 
 Review comment:
   This is because WriteFiles#expand requires `c.pane()` in the 
`processContext` call.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0

2018-08-07 Thread GitBox
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam 
version to 2.5.0
URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208205196
 
 

 ##
 File path: 
examples/beam/src/main/java/edu/snu/nemo/examples/beam/GenericSourceSink.java
 ##
 @@ -94,7 +94,10 @@ public static PDone write(final PCollection 
dataToWrite,
   dataToWrite.apply(ParDo.of(new HDFSWrite(path)));
   return PDone.in(dataToWrite.getPipeline());
 } else {
-  return dataToWrite.apply(TextIO.write().to(path));
+  // Added withWindowedWrites() to local file writes. This is necessary 
for FileResult coders.
+  // If not specified, FileResultCoder#encode will be blocked. See 
windowCoder in FileResultCoder#encode
 
 Review comment:
   Since this problem is caused from Beam, It was difficult to fully specify 
the cause. The execution is blocked at 
[here](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L1141)
 when trying to encode FileResult.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0

2018-08-07 Thread GitBox
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam 
version to 2.5.0
URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208204396
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/HashPartitioner.java
 ##
 @@ -40,6 +40,11 @@ public HashPartitioner(final int dstParallelism,
 
   @Override
   public Integer partition(final Object element) {
-return Math.abs(keyExtractor.extractKey(element).hashCode() % 
dstParallelism);
+final Object key = keyExtractor.extractKey(element);
+if (key == null) {
 
 Review comment:
   Beam WriteFiles produces KV.of(null, something), during the sink. I will 
change BeamKeyExtractor to return 0 for null key pairs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services