[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0 URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r209167886 ## File path: examples/beam/src/test/java/edu/snu/nemo/examples/beam/PartitionWordsByLengthITCase.java ## @@ -71,13 +69,4 @@ public void test() throws Exception { .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName()) .build()); } - - @Test (timeout = TIMEOUT) - public void testSailfish() throws Exception { Review comment: I will re-enable the large shuffle policy tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0 URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r209167702 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java ## @@ -74,12 +77,24 @@ public void clearMain() { } public void clearTag(final String tag) { -if (this.additionalTagElementsMap.get(tag) == null) { +if (this.mainTagOutputChildren.contains(tag)) { // This dstVertexId is for the main tag clearMain(); } else { // Note that String#hashCode() can be cached, thus accessing additional output queues can be fast. this.additionalTagElementsMap.get(tag).clear(); } } + + public List getMainTagOutputQueue() { +return mainTagElements; + } + + public List getAdditionalTagOutputQueue(final String dstVertexId) { +if (this.mainTagOutputChildren.contains(dstVertexId)) { + return this.mainTagElements; +} else { + return this.additionalTagElementsMap.get(dstVertexId); Review comment: I will make these methods explicitly throw exceptions when wrong vertex id is passed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0 URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r209166646 ## File path: compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamKeyExtractor.java ## @@ -26,7 +26,8 @@ @Override public Object extractKey(final Object element) { if (element instanceof KV) { - return ((KV) element).getKey(); + final Object key = ((KV) element).getKey(); + return key == null ? 0 : key; Review comment: Thank you, I will leave a comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0 URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208424286 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java ## @@ -117,4 +117,16 @@ public boolean isEmpty(final String tag) { return this.additionalTagOutputQueues.get(tag).isEmpty(); } } + + public Queue getMainTagOutputQueue() { +return mainTagOutputQueue; + } + + public Queue getAdditionalTagOutputQueue(final String dstVertexId) { Review comment: This is invoked for every element. This is a newly introduced way of implementing `processElement` with additional output tags. Detailed example can be found [here](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L219). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0 URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208206155 ## File path: compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/CreateViewTransform.java ## @@ -52,8 +56,11 @@ public void prepare(final Context context, final OutputCollector oc) { @Override public void onData(final I element) { -WindowedValue data = WindowedValue.valueInGlobalWindow(element); -windowed.add(data); +// Since CreateViewTransform takes KV(Void, value), this is okay +if (element instanceof KV) { + final KV kv = (KV) element; + multiView.getDataList().add(kv.getValue()); +} Review comment: For handling sideInputs, I will look carefully again and see if I can make it more concise. Thanks for the comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0 URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208205871 ## File path: compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java ## @@ -249,7 +252,7 @@ public Instant timestamp() { @Override public PaneInfo pane() { - throw new UnsupportedOperationException("pane() in ProcessContext under DoTransform"); + return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN); Review comment: This is because WriteFiles#expand requires `c.pane()` in the `processContext` call. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0 URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208205196 ## File path: examples/beam/src/main/java/edu/snu/nemo/examples/beam/GenericSourceSink.java ## @@ -94,7 +94,10 @@ public static PDone write(final PCollection dataToWrite, dataToWrite.apply(ParDo.of(new HDFSWrite(path))); return PDone.in(dataToWrite.getPipeline()); } else { - return dataToWrite.apply(TextIO.write().to(path)); + // Added withWindowedWrites() to local file writes. This is necessary for FileResult coders. + // If not specified, FileResultCoder#encode will be blocked. See windowCoder in FileResultCoder#encode Review comment: Since this problem is caused from Beam, It was difficult to fully specify the cause. The execution is blocked at [here](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L1141) when trying to encode FileResult. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0
shpark commented on a change in pull request #91: [NEMO-3] Bump up the Beam version to 2.5.0 URL: https://github.com/apache/incubator-nemo/pull/91#discussion_r208204396 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partitioner/HashPartitioner.java ## @@ -40,6 +40,11 @@ public HashPartitioner(final int dstParallelism, @Override public Integer partition(final Object element) { -return Math.abs(keyExtractor.extractKey(element).hashCode() % dstParallelism); +final Object key = keyExtractor.extractKey(element); +if (key == null) { Review comment: Beam WriteFiles produces KV.of(null, something), during the sink. I will change BeamKeyExtractor to return 0 for null key pairs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services