[GitHub] wonook commented on issue #180: [NEMO-319] Fix path to beam resources in examples in README
wonook commented on issue #180: [NEMO-319] Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#issuecomment-447261325 Lovely! It looks good to me. I'll merge after the tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests
wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447259465 Oh, and I forgot to mention, but making an über jar is the right direction. I think This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kennknowles commented on issue #180: [NEMO-319] Fix path to beam resources in examples in README
kennknowles commented on issue #180: [NEMO-319] Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#issuecomment-447259985 OK, all the edits are done, I think. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook commented on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests
wonook commented on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447259465 Oh, and I forgot to mention, but making an über jar is the right direction. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests
wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447255102 @kennknowles I think referring to our `nemo-examples-beam` package `pom.xml` could help creating the right deps & bundling & distribution. To my knowledge, it is capable of running various Beam programs with the specified modules. To provide a bit more information, `com.github.fommil.netlib` library was used to implement the ALS example, and others are for other uses, so these would be irrelevant. So I think adding the following lines to your pom.xml will help. I hope this helps! ``` org.apache.hadoop hadoop-common ${hadoop.version} org.slf4j slf4j-api org.slf4j slf4j-log4j12 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests
wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447255102 @kennknowles I think referring to our `nemo-examples-beam` package `pom.xml` could help creating the right deps & bundling & distribution. To my knowledge, it is capable of running various Beam programs with the specified modules. To provide a bit more information, `com.github.fommil.netlib` library was used to implement the ALS example, so this would be irrelevant but try adding the following lines to your pom.xml. I hope this helps! ``` org.apache.hadoop hadoop-common ${hadoop.version} org.slf4j slf4j-api org.slf4j slf4j-log4j12 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook commented on issue #180: [NEMO-319] Fix path to beam resources in examples in README
wonook commented on issue #180: [NEMO-319] Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#issuecomment-447257364 @kennknowles Thanks for the quick reaction! Just one favor to ask regarding the PR: in the README, there's an `Examples` section at the almost very bottom of the document. Would it be possible to make the same changes there as well? I think it would be nice to handle all of relevant issues at once I'll squash & merge the PR as soon as it's handled, you won't need to rebase your commits. Thanks a lot! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook commented on issue #182: (WIP) Add Beam spec tests
wonook commented on issue #182: (WIP) Add Beam spec tests URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447255102 @kennknowles I think referring to our `nemo-examples-beam` package `pom.xml` could help creating the right deps & bundling & distribution. To my knowledge, it is capable of running various Beam programs with the specified modules. To provide a bit more information, `com.github.fommil.netlib` library was used to implement the ALS example, so this would be irrelevant but try adding the following lines to your pom.xml. I hope this helps! ``` org.apache.beam beam-sdks-java-extensions-sql ${beam.version} org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} provided org.apache.hadoop hadoop-common ${hadoop.version} org.slf4j slf4j-api org.slf4j slf4j-log4j12 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676353 ## File path: README.md ## @@ -86,7 +86,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ./bin/run_beam.sh \ -deploy_mode yarn \ -job_id mr_transient \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676326 ## File path: README.md ## @@ -77,7 +77,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ## MapReduce example ./bin/run_beam.sh \ -job_id mr_default \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ -optimization_policy org.apache.nemo.compiler.optimizer.policy.DefaultPolicy \ -user_main org.apache.nemo.examples.beam.WordCount \ -user_args "`pwd`/examples/resources/test_input_wordcount `pwd`/examples/resources/test_output_wordcount" Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676368 ## File path: README.md ## @@ -86,7 +86,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ./bin/run_beam.sh \ -deploy_mode yarn \ -job_id mr_transient \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676124 ## File path: README.md ## @@ -77,7 +77,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ## MapReduce example ./bin/run_beam.sh \ -job_id mr_default \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676093 ## File path: README.md ## @@ -77,7 +77,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ## MapReduce example ./bin/run_beam.sh \ -job_id mr_default \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook commented on issue #181: [NEMO-159] Nemo Web UI
wonook commented on issue #181: [NEMO-159] Nemo Web UI URL: https://github.com/apache/incubator-nemo/pull/181#issuecomment-447252608 @seojangho I've updated the tests. Please check again! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kennknowles opened a new pull request #182: (WIP) Add Beam spec tests
kennknowles opened a new pull request #182: (WIP) Add Beam spec tests URL: https://github.com/apache/incubator-nemo/pull/182 Opening this for early feedback. I spent a little while trying to get this to run, but did not finish it. I hope that some member of the Nemo community can easily point to my error. In this draft: - `compiler/frontend/beam/spectestsbundle` is an über jar with everything needed to run the tests - `compiler/frontend/beam/spectests` is the maven surefire config to run the tests I still get `ClassNotFound: CoderRegistry` in every test case, so the deps & bundling & distribution of the code is clearly still not right. Previously, I tried doing this inline in `compiler/frontend/beam/pom.xml`. I got the same error and thought it might be something to do with how the jars are used to execute the tests, so (like most distributed big data systems) I built more logic to create an über jar. The simpler version is here: https://github.com/kennknowles/incubator-nemo/tree/first-try-spectests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #180: Fix path to beam resources in examples in README
johnyangk commented on a change in pull request #180: Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241665971 ## File path: README.md ## @@ -77,7 +77,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ## MapReduce example ./bin/run_beam.sh \ -job_id mr_default \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ -optimization_policy org.apache.nemo.compiler.optimizer.policy.DefaultPolicy \ -user_main org.apache.nemo.examples.beam.WordCount \ -user_args "`pwd`/examples/resources/test_input_wordcount `pwd`/examples/resources/test_output_wordcount" Review comment: Please change this line to: ```bash -user_args "`pwd`/examples/resources/inputs/test_input_wordcount `pwd`/outputs/wordcount" ``` (1) the `inputs` directory was also added in #146 (2) the `outputs` directory can be easier to find This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #180: Fix path to beam resources in examples in README
johnyangk commented on a change in pull request #180: Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241666839 ## File path: README.md ## @@ -77,7 +77,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ## MapReduce example ./bin/run_beam.sh \ -job_id mr_default \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ Review comment: executor -> executors This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #180: Fix path to beam resources in examples in README
johnyangk commented on a change in pull request #180: Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241667063 ## File path: README.md ## @@ -86,7 +86,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ./bin/run_beam.sh \ -deploy_mode yarn \ -job_id mr_transient \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ Review comment: executor -> executors This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook commented on a change in pull request #180: Fix path to beam resources in examples in README
wonook commented on a change in pull request #180: Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241666959 ## File path: README.md ## @@ -86,7 +86,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ./bin/run_beam.sh \ -deploy_mode yarn \ -job_id mr_transient \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ Review comment: also `executors` here as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook commented on a change in pull request #180: Fix path to beam resources in examples in README
wonook commented on a change in pull request #180: Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241666859 ## File path: README.md ## @@ -77,7 +77,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib ## MapReduce example ./bin/run_beam.sh \ -job_id mr_default \ - -executor_json `pwd`/examples/resources/beam_test_executor_resources.json \ + -executor_json `pwd`/examples/resources/executor/beam_test_executor_resources.json \ Review comment: I think it should be `executors` here! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho commented on issue #181: [NEMO-159] Nemo Web UI
seojangho commented on issue #181: [NEMO-159] Nemo Web UI URL: https://github.com/apache/incubator-nemo/pull/181#issuecomment-447240855 License check, please. All files should be licensed to ASF. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook opened a new pull request #181: [NEMO-159] Nemo Web UI
wonook opened a new pull request #181: [NEMO-159] Nemo Web UI URL: https://github.com/apache/incubator-nemo/pull/181 JIRA: [NEMO-159: Nemo Web UI](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-159) **Major changes:** - Provides structure to the Web UI application - Creates a number of views (Stages View, Storage View, Jobs View, etc.) - Major update on the UI/UX of the Web UI to make visualization easily accessible to users - Major re-design of the DAG visualization, to provide appropriate labels about the transforms. **Minor changes to note:** - Various code cleanups **Tests for the changes:** - Existing tests verify the code. **Other comments:** - None Closes # This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kennknowles opened a new pull request #180: Fix path to beam resources in examples in README
kennknowles opened a new pull request #180: Fix path to beam resources in examples in README URL: https://github.com/apache/incubator-nemo/pull/180 Files in `examples/beam/resources` were moved in #146 but the commands in the README were not updated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on issue #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo
johnyangk commented on issue #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo URL: https://github.com/apache/incubator-nemo/pull/95#issuecomment-447217829 Copyright looks good This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241644565 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -134,14 +131,15 @@ public void onData(final WindowedValue> element) { /** * Process the collected data and trigger timers. - * @param inputWatermark current input watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ - private void processElementsAndTriggerTimers(final Watermark inputWatermark, - final Instant processingTime, + private void processElementsAndTriggerTimers(final Instant processingTime, final Instant synchronizedTime) { -for (final Map.Entry>> entry : keyToValues.entrySet()) { Review comment: for avoiding iterating over 'empty' keys. (e.g., keys from previous windows that we never encounter in the current window) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241641897 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/NemoTimerInternals.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.*; +import org.apache.beam.runners.core.*; +import org.apache.beam.sdk.state.*; +import org.joda.time.Instant; + +import javax.annotation.Nullable; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Nemo timer internals that add/remove timer data to timer context. + * @param key type + */ +public final class NemoTimerInternals implements TimerInternals { + + /** The current set timers by namespace and ID. */ + private final Table existingTimers = HashBasedTable.create(); + + /** Current output watermark. */ + @Nullable private Instant outputWatermarkTime = null; + + private final K key; + private final ContextForTimer context; + + public NemoTimerInternals(final K key, +final ContextForTimer context) { +this.key = key; +this.context = context; + } + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { +return outputWatermarkTime; + } + + public void setCurrentOutputWatermarkTime(final Instant time) { +outputWatermarkTime = time; + } + + @Override + public void setTimer( +final StateNamespace namespace, final String timerId, final Instant target, final TimeDomain timeDomain) { +setTimer(TimerInternals.TimerData.of(timerId, namespace, target, timeDomain)); + } + + /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}. */ + @Deprecated + @Override + public void setTimer(final TimerInternals.TimerData timerData) { +@Nullable +final TimerInternals.TimerData existing = existingTimers.get(timerData.getNamespace(), timerData.getTimerId()); +if (existing == null) { + existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), timerData); + context.addTimer(key, timerData); +} else { + checkArgument( +timerData.getDomain().equals(existing.getDomain()), +"Attempt to set %s for time domain %s, but it is already set for time domain %s", +timerData.getTimerId(), +timerData.getDomain(), +existing.getDomain()); + + if (!timerData.getTimestamp().equals(existing.getTimestamp())) { +context.removeTimer(key, existing); Review comment: Is it okay to remove the existing timer? Can you add a comment? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241639326 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/ContextForTimer.java ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.nemo.common.Pair; +import org.joda.time.Instant; + +import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; + +/** + * This class contains necessary data for timers. + * @param key type + */ +final class ContextForTimer { + // Pending input watermark timers of all keys, in timestamp order. + private final NavigableSet> watermarkTimers; + + // Pending processing time timers of all keys, in timestamp order. + private final NavigableSet> processingTimers; + + // Pending synchronized processing time timers of all keys, in timestamp order. + private final NavigableSet> synchronizedProcessingTimers; + + // Current input watermark. + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + // Current processing time. + private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + // Current synchronized processing time. + private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + // map that holds timer internals of each key + private final Map timerInternalsMap; + + /** + * This comparator first compares the timer data, in timestamp order. + * If two timer data have the same timestamp, we order them by key. + * As the key is not comparable, we convert it to string and compare the string value. + * In fact, the key ordering is not important, because the first ordering is determined by the timestamp. + * We only have to check whether the two keys are the same or not. + */ + private final Comparator> comparator = (o1, o2) -> { +final int comp = o1.right().compareTo(o2.right()); +if (comp == 0) { + // if two timer are the same, compare key + if (o1.left() == null && o2.left() == null) { +return 0; + } else if (o1.left() == null || o2.left() == null) { +return -1; + } else if (o1.left().equals(o2.left())) { +return 0; + } else { +return o1.left().toString().compareTo(o2.left().toString()); Review comment: This can return 0 for two different objects, violating the contract of `equals`. Would it make sense to return a static number (e.g., 1) here since the ordering of the keys doesn't matter? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241645005 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -212,65 +213,59 @@ public void onWatermark(final Watermark inputWatermark) { @Override protected void beforeClose() { // Finish any pending windows by advancing the input watermark to infinity. -processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()), - BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); +inputWatermark = new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); +processElementsAndTriggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); } /** - * Trigger times for current key. + * Trigger times if next timers exist. * When triggering, it emits the windowed data to downstream operators. - * @param key key - * @param watermark watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ - private void triggerTimers(final K key, - final Watermark watermark, - final Instant processingTime, - final Instant synchronizedTime) { -final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals) - inMemoryTimerInternalsFactory.timerInternalsForKey(key); -try { - timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp())); - timerInternals.advanceProcessingTime(processingTime); - timerInternals.advanceSynchronizedProcessingTime(synchronizedTime); -} catch (final Exception e) { - throw new RuntimeException(e); -} + private int triggerTimers(final Instant processingTime, +final Instant synchronizedTime) { +final ContextForTimer context = inMemoryTimerInternalsFactory.context; +context.setCurrentInputWatermarkTime(new Instant(inputWatermark.getTimestamp())); Review comment: Maybe this lets us do without the `inputWatermark` field variable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241645416 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -212,65 +213,59 @@ public void onWatermark(final Watermark inputWatermark) { @Override protected void beforeClose() { // Finish any pending windows by advancing the input watermark to infinity. -processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()), - BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); +inputWatermark = new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); +processElementsAndTriggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE); } /** - * Trigger times for current key. + * Trigger times if next timers exist. * When triggering, it emits the windowed data to downstream operators. - * @param key key - * @param watermark watermark * @param processingTime processing time * @param synchronizedTime synchronized time */ - private void triggerTimers(final K key, - final Watermark watermark, - final Instant processingTime, - final Instant synchronizedTime) { -final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals) - inMemoryTimerInternalsFactory.timerInternalsForKey(key); -try { - timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp())); - timerInternals.advanceProcessingTime(processingTime); - timerInternals.advanceSynchronizedProcessingTime(synchronizedTime); -} catch (final Exception e) { - throw new RuntimeException(e); -} + private int triggerTimers(final Instant processingTime, +final Instant synchronizedTime) { +final ContextForTimer context = inMemoryTimerInternalsFactory.context; +context.setCurrentInputWatermarkTime(new Instant(inputWatermark.getTimestamp())); +context.setCurrentProcessingTime(processingTime); +context.setCurrentSynchronizedProcessingTime(synchronizedTime); -final List timerDataList = getEligibleTimers(timerInternals); +// get next eligible timers +final List> timers = getEligibleTimers(); -if (!timerDataList.isEmpty()) { +for (final Pair timer : timers) { // Trigger timers and emit windowed data final KeyedWorkItem timerWorkItem = -KeyedWorkItems.timersWorkItem(key, timerDataList); +KeyedWorkItems.timersWorkItem(timer.left(), Collections.singletonList(timer.right())); Review comment: Minor question: Would it be worthwhile to group timers by keys first and then invoke `processElement` to minimize the number of invocations? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241644760 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -52,15 +53,11 @@ private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory; private Watermark prevOutputWatermark; private final Map keyAndWatermarkHoldMap; + private final WindowingStrategy windowingStrategy; Review comment: unused variable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241644905 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -52,15 +53,11 @@ private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory; private Watermark prevOutputWatermark; private final Map keyAndWatermarkHoldMap; + private final WindowingStrategy windowingStrategy; + private Watermark inputWatermark; Review comment: Remove this variable and reuse `inMemoryTimerInternalsFactory.context#currentInputWatermarkTime`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho closed pull request #179: [NEMO-318] Make nemo.common.Cloneable extend java.lang.Cloneable
seojangho closed pull request #179: [NEMO-318] Make nemo.common.Cloneable extend java.lang.Cloneable URL: https://github.com/apache/incubator-nemo/pull/179 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/org/apache/nemo/common/Cloneable.java b/common/src/main/java/org/apache/nemo/common/Cloneable.java index f1c3eab41..5c32bfde8 100644 --- a/common/src/main/java/org/apache/nemo/common/Cloneable.java +++ b/common/src/main/java/org/apache/nemo/common/Cloneable.java @@ -29,7 +29,7 @@ * * @param the type of objects that this class can clone */ -public interface Cloneable> { +public interface Cloneable { /** * Creates and returns a copy of this object. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho commented on issue #179: [NEMO-318] Make nemo.common.Cloneable extend java.lang.Cloneable
seojangho commented on issue #179: [NEMO-318] Make nemo.common.Cloneable extend java.lang.Cloneable URL: https://github.com/apache/incubator-nemo/pull/179#issuecomment-446864939 Oops. This PR was pointless. Sorry. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho commented on issue #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo
seojangho commented on issue #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo URL: https://github.com/apache/incubator-nemo/pull/95#issuecomment-446861615 @johnyangk @wonook Please check the updates. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho opened a new pull request #179: [NEMO-318] Make nemo.common.Cloneable extend java.lang.Cloneable
seojangho opened a new pull request #179: [NEMO-318] Make nemo.common.Cloneable extend java.lang.Cloneable URL: https://github.com/apache/incubator-nemo/pull/179 JIRA: [NEMO-318: Make org.apache.nemo.common.Cloneable inherit java.lang.Cloneable](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-318) **Minor changes to note:** - Made org.apache.nemo.common.Cloneable inherit java.lang.Cloneable, not itself This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum opened a new pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform
taegeonum opened a new pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform URL: https://github.com/apache/incubator-nemo/pull/178 JIRA: [NEMO-317: Optimize triggering logic in GroupByKeyAndWindowDoFnTransform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-317) **Major changes:** - Create `NemoTimerInternals` that adds timer data to the shared data structures of `ContextForTimer`. - Remove timers from `ContextForTimer` and trigger them, instead of iterating all keys. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jeongyooneo closed pull request #166: [NEMO-80] SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder
jeongyooneo closed pull request #166: [NEMO-80] SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder URL: https://github.com/apache/incubator-nemo/pull/166 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index b77c67f9e..ce18b0075 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,16 @@ under the License. slf4j-log4j12 ${slf4j.version} + + +org.slf4j +slf4j-simple +1.6.2 +test + This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum closed pull request #165: [NEMO-221] Confusing error messages for a failure at the client-side …
taegeonum closed pull request #165: [NEMO-221] Confusing error messages for a failure at the client-side … URL: https://github.com/apache/incubator-nemo/pull/165 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/ClientRPC.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/ClientRPC.java index f17f284ae..213f9a1b9 100644 --- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/ClientRPC.java +++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/ClientRPC.java @@ -58,11 +58,16 @@ private ClientRPC(final TransportFactory transportFactory, final LocalAddressProvider localAddressProvider, @Parameter(JobConf.ClientSideRPCServerHost.class) final String clientHost, -@Parameter(JobConf.ClientSideRPCServerPort.class) final int clientPort) throws IOException { +@Parameter(JobConf.ClientSideRPCServerPort.class) final int clientPort) { transport = transportFactory.newInstance(localAddressProvider.getLocalAddress(), 0, new SyncStage<>(new RPCEventHandler()), null, RETRY_COUNT, RETRY_TIMEOUT); final SocketAddress clientAddress = new InetSocketAddress(clientHost, clientPort); -link = transport.open(clientAddress, ENCODER, LINK_LISTENER); +try { + link = transport.open(clientAddress, ENCODER, LINK_LISTENER); +} catch (final IOException e) { + throw new IllegalStateException("Failed to setup an RPC connection to the Client. " ++ "A failure at the client-side is suspected."); +} } /** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on issue #165: [NEMO-221] Confusing error messages for a failure at the client-side …
taegeonum commented on issue #165: [NEMO-221] Confusing error messages for a failure at the client-side … URL: https://github.com/apache/incubator-nemo/pull/165#issuecomment-446023531 LGTM. I'm merging it. Thanks @yunseong ! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho closed pull request #175: [NEMO-310] Exclude beam/spark examples jars from deployment
seojangho closed pull request #175: [NEMO-310] Exclude beam/spark examples jars from deployment URL: https://github.com/apache/incubator-nemo/pull/175 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk closed pull request #177: [NEMO-316] CombinePartial/FinalTransform is not disabled in streaming mode
johnyangk closed pull request #177: [NEMO-316] CombinePartial/FinalTransform is not disabled in streaming mode URL: https://github.com/apache/incubator-nemo/pull/177 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java index 3e000d37f..7c6335751 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java @@ -341,7 +341,7 @@ private static void flattenTranslator(final PipelineTranslationContext ctx, // Check if the partial combining optimization can be applied. // If not, simply use the default Combine implementation by entering into it. -if (!isGlobalWindow(beamNode, ctx.getPipeline())) { +if (!(isMainInputBounded(beamNode, ctx.getPipeline()) && isGlobalWindow(beamNode, ctx.getPipeline( { // TODO #263: Partial Combining for Beam Streaming return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM; } @@ -500,6 +500,7 @@ private static Transform createGBKTransform( } } + /** * @param beamNode the beam node to be translated. * @param pipeline pipeline. @@ -511,4 +512,16 @@ private static boolean isGlobalWindow(final TransformHierarchy.Node beamNode, fi Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform)); return mainInput.getWindowingStrategy().getWindowFn() instanceof GlobalWindows; } + + /** + * @param beamNode the beam node to be translated. + * @param pipeline pipeline. + * @return true if the main input bounded. + */ + private static boolean isMainInputBounded(final TransformHierarchy.Node beamNode, final Pipeline pipeline) { +final AppliedPTransform pTransform = beamNode.toAppliedPTransform(pipeline); +final PCollection mainInput = (PCollection) + Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform)); +return mainInput.isBounded() == PCollection.IsBounded.BOUNDED; + } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum opened a new pull request #177: [NEMO-316] CombinePartial/FinalTransform is not disabled in streaming mode
taegeonum opened a new pull request #177: [NEMO-316] CombinePartial/FinalTransform is not disabled in streaming mode URL: https://github.com/apache/incubator-nemo/pull/177 JIRA: [NEMO-316: CombinePartial/FinalTransform is not disabled in streaming mode](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-316) **Major changes:** - Check whether the main input is bounded or not. The solution is brought from @johnyangk's nexmark branch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum closed pull request #176: [NEMO-315] Remove checkstyle settings for javadoc error
taegeonum closed pull request #176: [NEMO-315] Remove checkstyle settings for javadoc error URL: https://github.com/apache/incubator-nemo/pull/176 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle.xml b/checkstyle.xml index e3d0b2869..c7f876114 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -77,10 +77,6 @@ under the License. - - - - @@ -88,7 +84,7 @@ under the License. - + diff --git a/suppressions.xml b/suppressions.xml deleted file mode 100644 index f0f4d8cbd..0 --- a/suppressions.xml +++ /dev/null @@ -1,28 +0,0 @@ - - - -https://checkstyle.org/dtds/suppressions_1_2.dtd;> - - - - This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on issue #176: [NEMO-315] Remove checkstyle settings for javadoc error
taegeonum commented on issue #176: [NEMO-315] Remove checkstyle settings for javadoc error URL: https://github.com/apache/incubator-nemo/pull/176#issuecomment-445660930 Build success in my local mac OSX! I will merge it. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jeongyooneo opened a new pull request #176: [NEMO-315] Remove checkstyle settings for javadoc error
jeongyooneo opened a new pull request #176: [NEMO-315] Remove checkstyle settings for javadoc error URL: https://github.com/apache/incubator-nemo/pull/176 JIRA: [NEMO-315: Remove javadoc checkstyle suppression](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-315) **Major changes:** - N/A **Minor changes to note:** - Checkstyle settings for javadoc error suppression for those modules are removed due to reported build failures. **Tests for the changes:** - N/A **Other comments:** - N/A This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jooykim opened a new pull request #175: [NEMO-310] Exclude beam/spark examples jars from deployment
jooykim opened a new pull request #175: [NEMO-310] Exclude beam/spark examples jars from deployment URL: https://github.com/apache/incubator-nemo/pull/175 JIRA: [NEMO-310: Exclude 'examples' JAR from release artifacts](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-310) **Major changes:** - Modifies examples/beam/pom.xml and examples/spark/pom.xml to exclude them from deployment to Nexus. **Minor changes to note:** - N/A **Tests for the changes:** - I've checked that it works using "mvn deploy" to Apache's Nexus repository. **Other comments:** - N/A This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jeongyooneo closed pull request #169: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings
jeongyooneo closed pull request #169: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings URL: https://github.com/apache/incubator-nemo/pull/169 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java index 4d1c9dafe..e912125f7 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java @@ -39,8 +39,8 @@ /** * Default constructor. - * @param aggregatedDynOptData initial dynamic optimization data. - * @param dynOptDataAggregator aggregator for the dynamic optimization data. + * @param aggregatedDynOptData per-stage aggregated dynamic optimization data. + * @param dynOptDataAggregator aggregator to use. */ public AggregateMetricTransform(final O aggregatedDynOptData, final BiFunction dynOptDataAggregator) { diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java index 05254bc76..39dc6e036 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java @@ -41,8 +41,8 @@ /** * MetricCollectTransform constructor. - * @param dynOptData dynamic optimization data. - * @param dynOptDataCollector for the data. + * @param dynOptData per-task dynamic optimization data. + * @param dynOptDataCollector that collects the data. * @param closer callback function to be invoked when closing the transform. */ public MetricCollectTransform(final O dynOptData, diff --git a/suppressions.xml b/suppressions.xml index c1e89929c..f0f4d8cbd 100644 --- a/suppressions.xml +++ b/suppressions.xml @@ -24,5 +24,5 @@ under the License. + files="[\\/]runtime[\\/]|[\\/]compiler[\\/]|[\\/]client[\\/]|[\\/]examples[\\/]"/> This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jeongyooneo closed pull request #169: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings
jeongyooneo closed pull request #169: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings URL: https://github.com/apache/incubator-nemo/pull/169 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java index 4d1c9dafe..e912125f7 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java @@ -39,8 +39,8 @@ /** * Default constructor. - * @param aggregatedDynOptData initial dynamic optimization data. - * @param dynOptDataAggregator aggregator for the dynamic optimization data. + * @param aggregatedDynOptData per-stage aggregated dynamic optimization data. + * @param dynOptDataAggregator aggregator to use. */ public AggregateMetricTransform(final O aggregatedDynOptData, final BiFunction dynOptDataAggregator) { diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java index 05254bc76..39dc6e036 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java @@ -41,8 +41,8 @@ /** * MetricCollectTransform constructor. - * @param dynOptData dynamic optimization data. - * @param dynOptDataCollector for the data. + * @param dynOptData per-task dynamic optimization data. + * @param dynOptDataCollector that collects the data. * @param closer callback function to be invoked when closing the transform. */ public MetricCollectTransform(final O dynOptData, diff --git a/suppressions.xml b/suppressions.xml index c1e89929c..f0f4d8cbd 100644 --- a/suppressions.xml +++ b/suppressions.xml @@ -24,5 +24,5 @@ under the License. + files="[\\/]runtime[\\/]|[\\/]compiler[\\/]|[\\/]client[\\/]|[\\/]examples[\\/]"/> This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jeongyooneo closed pull request #170: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings
jeongyooneo closed pull request #170: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings URL: https://github.com/apache/incubator-nemo/pull/170 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle.xml b/checkstyle.xml index c7f876114..e3d0b2869 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -77,6 +77,10 @@ under the License. + + + + @@ -84,7 +88,7 @@ under the License. - + diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java index f95bc73fc..e849acae3 100644 --- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java +++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java @@ -438,6 +438,7 @@ private static Configuration getDeployModeConf(final Configuration jobConf) thro * @param jobConf job configuration to get json path. * @param pathParameter named parameter represents path to the json file, or an empty string * @param contentsParameter named parameter represents contents of the file + * @param defaultContentthe default configuration * @return configuration with contents of the file, or an empty string as value for {@code contentsParameter} * @throws InjectionException exception while injection. */ diff --git a/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java b/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java index 0de21378e..8d7f166d1 100644 --- a/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java +++ b/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java @@ -28,11 +28,15 @@ /** * Default constructor. + * @param metric metric. */ public DataSkewMetricFactory(final Map metric) { this.metric = metric; } + /** + * @return the metric. + */ public Map getMetric() { return metric; } diff --git a/common/src/main/java/org/apache/nemo/common/HashRange.java b/common/src/main/java/org/apache/nemo/common/HashRange.java index 1ca00078b..6a5f195cc 100644 --- a/common/src/main/java/org/apache/nemo/common/HashRange.java +++ b/common/src/main/java/org/apache/nemo/common/HashRange.java @@ -33,6 +33,7 @@ * Private constructor. * @param rangeBeginInclusive point at which the hash range starts (inclusive). * @param rangeEndExclusive point at which the hash range ends (exclusive). + * @param isSkewed whether or not the range is skewed */ private HashRange(final int rangeBeginInclusive, final int rangeEndExclusive, final boolean isSkewed) { if (rangeBeginInclusive < 0 || rangeEndExclusive < 0) { @@ -53,6 +54,7 @@ public static HashRange all() { /** * @param rangeStartInclusive the start of the range (inclusive) * @param rangeEndExclusive the end of the range (exclusive) + * @param isSkewed whether or not the range is skewed * @return A hash range descriptor representing [{@code rangeBeginInclusive}, {@code rangeEndExclusive}) */ public static HashRange of(final int rangeStartInclusive, final int rangeEndExclusive, final boolean isSkewed) { @@ -119,6 +121,9 @@ public boolean equals(final Object o) { return true; } + /** + * @return the hash value. + */ @Override public int hashCode() { return Arrays.hashCode(new Object[] { @@ -128,6 +133,9 @@ public int hashCode() { }); } + /** + * @return whether or not the range is skewed. + */ public boolean isSkewed() { return isSkewed; } diff --git a/common/src/main/java/org/apache/nemo/common/StateMachine.java b/common/src/main/java/org/apache/nemo/common/StateMachine.java index 24180e23e..28740a95f 100644 --- a/common/src/main/java/org/apache/nemo/common/StateMachine.java +++ b/common/src/main/java/org/apache/nemo/common/StateMachine.java @@ -61,7 +61,7 @@ public synchronized void checkState(final Enum expectedCurrentState) { * Sets the current state as a certain state. * * @param state a state - * @throws RuntimeException if the state is unknown state, or the transition + * @throws IllegalStateTransitionException the state is unknown state, or the transition * from the current state to the specified state is illegal */ public synchronized void setState(final Enum state) throws IllegalStateTransitionException { @@ -86,7 +86,7 @@ public synchronized void setState(final Enum state) throws IllegalStateTransitio * @param state a state * @return {@code true} if successful. {@code false} indicates that * the actual value was not equal to the expected value. - *
[GitHub] jeongyooneo closed pull request #170: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings
jeongyooneo closed pull request #170: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings URL: https://github.com/apache/incubator-nemo/pull/170 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle.xml b/checkstyle.xml index c7f876114..e3d0b2869 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -77,6 +77,10 @@ under the License. + + + + @@ -84,7 +88,7 @@ under the License. - + diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java index f95bc73fc..e849acae3 100644 --- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java +++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java @@ -438,6 +438,7 @@ private static Configuration getDeployModeConf(final Configuration jobConf) thro * @param jobConf job configuration to get json path. * @param pathParameter named parameter represents path to the json file, or an empty string * @param contentsParameter named parameter represents contents of the file + * @param defaultContentthe default configuration * @return configuration with contents of the file, or an empty string as value for {@code contentsParameter} * @throws InjectionException exception while injection. */ diff --git a/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java b/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java index 0de21378e..8d7f166d1 100644 --- a/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java +++ b/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java @@ -28,11 +28,15 @@ /** * Default constructor. + * @param metric metric. */ public DataSkewMetricFactory(final Map metric) { this.metric = metric; } + /** + * @return the metric. + */ public Map getMetric() { return metric; } diff --git a/common/src/main/java/org/apache/nemo/common/HashRange.java b/common/src/main/java/org/apache/nemo/common/HashRange.java index 1ca00078b..6a5f195cc 100644 --- a/common/src/main/java/org/apache/nemo/common/HashRange.java +++ b/common/src/main/java/org/apache/nemo/common/HashRange.java @@ -33,6 +33,7 @@ * Private constructor. * @param rangeBeginInclusive point at which the hash range starts (inclusive). * @param rangeEndExclusive point at which the hash range ends (exclusive). + * @param isSkewed whether or not the range is skewed */ private HashRange(final int rangeBeginInclusive, final int rangeEndExclusive, final boolean isSkewed) { if (rangeBeginInclusive < 0 || rangeEndExclusive < 0) { @@ -53,6 +54,7 @@ public static HashRange all() { /** * @param rangeStartInclusive the start of the range (inclusive) * @param rangeEndExclusive the end of the range (exclusive) + * @param isSkewed whether or not the range is skewed * @return A hash range descriptor representing [{@code rangeBeginInclusive}, {@code rangeEndExclusive}) */ public static HashRange of(final int rangeStartInclusive, final int rangeEndExclusive, final boolean isSkewed) { @@ -119,6 +121,9 @@ public boolean equals(final Object o) { return true; } + /** + * @return the hash value. + */ @Override public int hashCode() { return Arrays.hashCode(new Object[] { @@ -128,6 +133,9 @@ public int hashCode() { }); } + /** + * @return whether or not the range is skewed. + */ public boolean isSkewed() { return isSkewed; } diff --git a/common/src/main/java/org/apache/nemo/common/StateMachine.java b/common/src/main/java/org/apache/nemo/common/StateMachine.java index 24180e23e..28740a95f 100644 --- a/common/src/main/java/org/apache/nemo/common/StateMachine.java +++ b/common/src/main/java/org/apache/nemo/common/StateMachine.java @@ -61,7 +61,7 @@ public synchronized void checkState(final Enum expectedCurrentState) { * Sets the current state as a certain state. * * @param state a state - * @throws RuntimeException if the state is unknown state, or the transition + * @throws IllegalStateTransitionException the state is unknown state, or the transition * from the current state to the specified state is illegal */ public synchronized void setState(final Enum state) throws IllegalStateTransitionException { @@ -86,7 +86,7 @@ public synchronized void setState(final Enum state) throws IllegalStateTransitio * @param state a state * @return {@code true} if successful. {@code false} indicates that * the actual value was not equal to the expected value. - *
[GitHub] johnyangk closed pull request #174: [NEMO-311] Add StreamingPolicy
johnyangk closed pull request #174: [NEMO-311] Add StreamingPolicy URL: https://github.com/apache/incubator-nemo/pull/174 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/StreamingPolicy.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/StreamingPolicy.java new file mode 100644 index 0..9bdf2d4ed --- /dev/null +++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/StreamingPolicy.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.optimizer.policy; + +import org.apache.nemo.common.dag.DAG; +import org.apache.nemo.common.eventhandler.PubSubEventHandlerWrapper; +import org.apache.nemo.common.ir.edge.IREdge; +import org.apache.nemo.common.ir.vertex.IRVertex; +import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.PipeTransferForAllEdgesPass; +import org.apache.nemo.compiler.optimizer.pass.compiletime.composite.DefaultCompositePass; +import org.apache.reef.tang.Injector; + +/** + * Streaming policy. + */ +public final class StreamingPolicy implements Policy { + private final Policy policy; + + /** + * Default constructor. + */ + public StreamingPolicy() { +final PolicyBuilder builder = new PolicyBuilder(); +builder.registerCompileTimePass(new DefaultCompositePass()); +builder.registerCompileTimePass(new PipeTransferForAllEdgesPass()); +this.policy = builder.build(); + } + + @Override + public DAG runCompileTimeOptimization(final DAG dag, final String dagDirectory) { +return this.policy.runCompileTimeOptimization(dag, dagDirectory); + } + + @Override + public void registerRunTimeOptimizations(final Injector injector, final PubSubEventHandlerWrapper pubSubWrapper) { +this.policy.registerRunTimeOptimizations(injector, pubSubWrapper); + } +} This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk closed pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
johnyangk closed pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java index 06dde5861..a4315082a 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -240,15 +241,6 @@ private void triggerTimers(final K key, // The DoFnRunner interface requires WindowedValue, // but this windowed value is actually not used in the ReduceFnRunner internal. getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem)); - - // output watermark - // we set output watermark to the minimum of the timer data - long keyOutputTimestamp = Long.MAX_VALUE; - for (final TimerInternals.TimerData timer : timerDataList) { -keyOutputTimestamp = Math.min(keyOutputTimestamp, timer.getTimestamp().getMillis()); - } - - timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp)); } } @@ -349,14 +341,21 @@ public TimerInternals timerInternalsForKey(final K key) { @Override public void emit(final WindowedValue>> output) { - // adds the output timestamp to the watermark hold of each key - // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 - // TODO #270: consider early firing - // TODO #270: This logic may not be applied to early firing outputs - keyAndWatermarkHoldMap.put(output.getValue().getKey(), -new Watermark(output.getTimestamp().getMillis() + 1)); + + // The watermark advances only in ON_TIME + if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) { +final K key = output.getValue().getKey(); +final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals) + inMemoryTimerInternalsFactory.timerInternalsForKey(key); +keyAndWatermarkHoldMap.put(key, + // adds the output timestamp to the watermark hold of each key + // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 + new Watermark(output.getTimestamp().getMillis() + 1)); +timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1)); + } outputCollector.emit(output); } + @Override public void emitWatermark(final Watermark watermark) { outputCollector.emitWatermark(watermark); diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java index 474c79c0d..f0749c0c0 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java @@ -33,16 +33,20 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; -import static java.util.Collections.emptyList; +import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.EARLY; +import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.LATE; +import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME; +import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; -// TODO #270: Test different triggers public final class GroupByKeyAndWindowDoFnTransformTest { - + private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransformTest.class.getName()); private
[GitHub] taegeonum commented on issue #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
taegeonum commented on issue #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172#issuecomment-443989001 @johnyangk I've addressed your comments. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
taegeonum commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238541422 ## File path: compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java ## @@ -248,4 +252,120 @@ public void test() { doFnTransform.close(); } + + /** + * Test complex triggers that emit early and late firing. + */ + @Test + public void eventTimeTriggerTest() { +final Duration lateness = Duration.standardSeconds(1); +final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow() + // early firing + .withEarlyFirings( +AfterProcessingTime + .pastFirstElementInPane() + // early firing 1 sec after receiving an element + .plusDelayOf(Duration.millis(1000))) + // late firing: Fire on any late data. + .withLateFirings(AfterPane.elementCountAtLeast(1)); + +final FixedWindows window = (FixedWindows) Window.into( + FixedWindows.of(Duration.standardSeconds(5))) + // lateness + .withAllowedLateness(lateness) + .triggering(trigger) + .accumulatingFiredPanes().getWindowFn(); + +final TupleTag outputTag = new TupleTag<>("main-output"); +final GroupByKeyAndWindowDoFnTransform doFnTransform = + new GroupByKeyAndWindowDoFnTransform( +NULL_OUTPUT_CODERS, +outputTag, +WindowingStrategy.of(window).withTrigger(trigger) + .withMode(ACCUMULATING_FIRED_PANES) Review comment: I will add todo (but I've tested it without accumulating and it works well) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho closed pull request #173: [NEMO-307] Enable integration test
seojangho closed pull request #173: [NEMO-307] Enable integration test URL: https://github.com/apache/incubator-nemo/pull/173 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index 768ade95f..b77c67f9e 100644 --- a/pom.xml +++ b/pom.xml @@ -163,19 +163,6 @@ under the License. maven-surefire-plugin ${surefire.version} - -org.apache.maven.plugins -maven-failsafe-plugin -${surefire.version} - - - -integration-test -verify - - - - @@ -203,6 +190,19 @@ under the License. + + org.apache.maven.plugins + maven-failsafe-plugin + ${surefire.version} + + + +integration-test +verify + + + + org.codehaus.mojo build-helper-maven-plugin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
taegeonum commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238541138 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -351,12 +343,17 @@ public TimerInternals timerInternalsForKey(final K key) { public void emit(final WindowedValue>> output) { // adds the output timestamp to the watermark hold of each key // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 - // TODO #270: consider early firing - // TODO #270: This logic may not be applied to early firing outputs - keyAndWatermarkHoldMap.put(output.getValue().getKey(), -new Watermark(output.getTimestamp().getMillis() + 1)); + if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) { Review comment: EARLY does not guarantee watermark advance, so we should exclude early results. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238537290 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java ## @@ -351,12 +343,17 @@ public TimerInternals timerInternalsForKey(final K key) { public void emit(final WindowedValue>> output) { // adds the output timestamp to the watermark hold of each key // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999 - // TODO #270: consider early firing - // TODO #270: This logic may not be applied to early firing outputs - keyAndWatermarkHoldMap.put(output.getValue().getKey(), -new Watermark(output.getTimestamp().getMillis() + 1)); + if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) { Review comment: Can you add a brief comment on why we advance the output watermark for only ON_TIME outputs? Excluding LATE seems obvious (we don't want to advance 'backwards'), but what about EARLY? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238537837 ## File path: compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java ## @@ -248,4 +252,120 @@ public void test() { doFnTransform.close(); } + + /** + * Test complex triggers that emit early and late firing. + */ + @Test + public void eventTimeTriggerTest() { +final Duration lateness = Duration.standardSeconds(1); +final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow() + // early firing + .withEarlyFirings( +AfterProcessingTime + .pastFirstElementInPane() + // early firing 1 sec after receiving an element + .plusDelayOf(Duration.millis(1000))) + // late firing: Fire on any late data. + .withLateFirings(AfterPane.elementCountAtLeast(1)); + +final FixedWindows window = (FixedWindows) Window.into( + FixedWindows.of(Duration.standardSeconds(5))) + // lateness + .withAllowedLateness(lateness) + .triggering(trigger) + .accumulatingFiredPanes().getWindowFn(); + +final TupleTag outputTag = new TupleTag<>("main-output"); +final GroupByKeyAndWindowDoFnTransform doFnTransform = + new GroupByKeyAndWindowDoFnTransform( +NULL_OUTPUT_CODERS, +outputTag, +WindowingStrategy.of(window).withTrigger(trigger) + .withMode(ACCUMULATING_FIRED_PANES) +.withAllowedLateness(lateness), +PipelineOptionsFactory.as(NemoPipelineOptions.class), +SystemReduceFn.buffering(NULL_INPUT_CODER), +DisplayData.none()); + + +final Transform.Context context = mock(Transform.Context.class); +final TestOutputCollector>> oc = new TestOutputCollector(); +doFnTransform.prepare(context, oc); + +doFnTransform.onData(WindowedValue.of( + KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING)); + +// early firing is not related to the watermark progress +doFnTransform.onWatermark(new Watermark(2)); +assertEquals(1, oc.outputs.size()); +assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming()); +LOG.info("Output: {}", oc.outputs.get(0)); +oc.outputs.clear(); + +doFnTransform.onData(WindowedValue.of( + KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING)); +// EARLY firing... waiting >= 1 sec +try { + Thread.sleep(2000); +} catch (InterruptedException e) { + e.printStackTrace(); +} + +doFnTransform.onWatermark(new Watermark(5)); Review comment: I suppose this effectively advances processing time? Can you add a comment about that? Maybe also check that `oc.outputs` is empty prior to this line? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238539403 ## File path: compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java ## @@ -248,4 +252,120 @@ public void test() { doFnTransform.close(); } + + /** + * Test complex triggers that emit early and late firing. + */ + @Test + public void eventTimeTriggerTest() { +final Duration lateness = Duration.standardSeconds(1); +final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow() + // early firing + .withEarlyFirings( +AfterProcessingTime + .pastFirstElementInPane() + // early firing 1 sec after receiving an element + .plusDelayOf(Duration.millis(1000))) + // late firing: Fire on any late data. + .withLateFirings(AfterPane.elementCountAtLeast(1)); + +final FixedWindows window = (FixedWindows) Window.into( + FixedWindows.of(Duration.standardSeconds(5))) + // lateness + .withAllowedLateness(lateness) + .triggering(trigger) + .accumulatingFiredPanes().getWindowFn(); + +final TupleTag outputTag = new TupleTag<>("main-output"); +final GroupByKeyAndWindowDoFnTransform doFnTransform = + new GroupByKeyAndWindowDoFnTransform( +NULL_OUTPUT_CODERS, +outputTag, +WindowingStrategy.of(window).withTrigger(trigger) + .withMode(ACCUMULATING_FIRED_PANES) +.withAllowedLateness(lateness), +PipelineOptionsFactory.as(NemoPipelineOptions.class), +SystemReduceFn.buffering(NULL_INPUT_CODER), +DisplayData.none()); + + +final Transform.Context context = mock(Transform.Context.class); +final TestOutputCollector>> oc = new TestOutputCollector(); +doFnTransform.prepare(context, oc); + +doFnTransform.onData(WindowedValue.of( + KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING)); + +// early firing is not related to the watermark progress +doFnTransform.onWatermark(new Watermark(2)); +assertEquals(1, oc.outputs.size()); +assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming()); +LOG.info("Output: {}", oc.outputs.get(0)); +oc.outputs.clear(); + +doFnTransform.onData(WindowedValue.of( + KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING)); +// EARLY firing... waiting >= 1 sec +try { + Thread.sleep(2000); +} catch (InterruptedException e) { + e.printStackTrace(); +} + +doFnTransform.onWatermark(new Watermark(5)); +assertEquals(1, oc.outputs.size()); +assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming()); +// ACCUMULATION MODE +checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue()); +LOG.info("Output: {}", oc.outputs.get(0)); +oc.outputs.clear(); + +// ON TIME +doFnTransform.onData(WindowedValue.of( + KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING)); +doFnTransform.onWatermark(new Watermark(5001)); +assertEquals(1, oc.outputs.size()); +assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming()); +LOG.info("Output: {}", oc.outputs.get(0)); +// ACCUMULATION MODE +checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), oc.outputs.get(0).getValue()); +oc.outputs.clear(); + +// LATE DATA +// actual window: [0-5000) +// allowed lateness: 1000 (ms) +// current watermark: 5001 +// data: 4500 +// the data timestamp + allowed lateness > current watermark, +// so it should be accumulated to the prev window +doFnTransform.onData(WindowedValue.of( + KV.of("1", "hello!"), new Instant(4500), Review comment: hello -> hello again This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238538386 ## File path: compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java ## @@ -248,4 +252,120 @@ public void test() { doFnTransform.close(); } + + /** + * Test complex triggers that emit early and late firing. + */ + @Test + public void eventTimeTriggerTest() { +final Duration lateness = Duration.standardSeconds(1); +final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow() + // early firing + .withEarlyFirings( +AfterProcessingTime + .pastFirstElementInPane() + // early firing 1 sec after receiving an element + .plusDelayOf(Duration.millis(1000))) + // late firing: Fire on any late data. + .withLateFirings(AfterPane.elementCountAtLeast(1)); + +final FixedWindows window = (FixedWindows) Window.into( + FixedWindows.of(Duration.standardSeconds(5))) + // lateness + .withAllowedLateness(lateness) + .triggering(trigger) + .accumulatingFiredPanes().getWindowFn(); + +final TupleTag outputTag = new TupleTag<>("main-output"); +final GroupByKeyAndWindowDoFnTransform doFnTransform = + new GroupByKeyAndWindowDoFnTransform( +NULL_OUTPUT_CODERS, +outputTag, +WindowingStrategy.of(window).withTrigger(trigger) + .withMode(ACCUMULATING_FIRED_PANES) +.withAllowedLateness(lateness), +PipelineOptionsFactory.as(NemoPipelineOptions.class), +SystemReduceFn.buffering(NULL_INPUT_CODER), +DisplayData.none()); + + +final Transform.Context context = mock(Transform.Context.class); +final TestOutputCollector>> oc = new TestOutputCollector(); +doFnTransform.prepare(context, oc); + +doFnTransform.onData(WindowedValue.of( + KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING)); + +// early firing is not related to the watermark progress +doFnTransform.onWatermark(new Watermark(2)); +assertEquals(1, oc.outputs.size()); +assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming()); +LOG.info("Output: {}", oc.outputs.get(0)); +oc.outputs.clear(); + +doFnTransform.onData(WindowedValue.of( + KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING)); +// EARLY firing... waiting >= 1 sec +try { + Thread.sleep(2000); +} catch (InterruptedException e) { + e.printStackTrace(); +} + +doFnTransform.onWatermark(new Watermark(5)); +assertEquals(1, oc.outputs.size()); +assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming()); +// ACCUMULATION MODE +checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue()); +LOG.info("Output: {}", oc.outputs.get(0)); +oc.outputs.clear(); + +// ON TIME +doFnTransform.onData(WindowedValue.of( + KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING)); +doFnTransform.onWatermark(new Watermark(5001)); +assertEquals(1, oc.outputs.size()); +assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming()); +LOG.info("Output: {}", oc.outputs.get(0)); +// ACCUMULATION MODE +checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), oc.outputs.get(0).getValue()); +oc.outputs.clear(); + +// LATE DATA +// actual window: [0-5000) +// allowed lateness: 1000 (ms) +// current watermark: 5001 +// data: 4500 +// the data timestamp + allowed lateness > current watermark, +// so it should be accumulated to the prev window +doFnTransform.onData(WindowedValue.of( + KV.of("1", "hello!"), new Instant(4500), + window.assignWindow(new Instant(4500)), PaneInfo.NO_FIRING)); +doFnTransform.onWatermark(new Watermark(6000)); +assertEquals(1, oc.outputs.size()); +assertEquals(LATE, oc.outputs.get(0).getPane().getTiming()); +LOG.info("Output: {}", oc.outputs.get(0)); +// The data should be accumulated to the previous window because it allows 1 second lateness +checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!", "hello!")), oc.outputs.get(0).getValue()); +oc.outputs.clear(); + +// LATE DATA +// data timestamp: 4800 +// current watermark: 6000 +// data timestamp + allowed lateness < current watermark +// It should not be accumulated to the prev window +doFnTransform.onData(WindowedValue.of( + KV.of("1", "hello!"), new Instant(4800), Review comment: hello -> bye This is an automated message from the Apache Git Service. To respond to the message, please log on
[GitHub] taegeonum opened a new pull request #173: [NEMO-307] Enable integration test
taegeonum opened a new pull request #173: [NEMO-307] Enable integration test URL: https://github.com/apache/incubator-nemo/pull/173 JIRA: [NEMO-307: Enable integration test](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-307) **Major changes:** - Fix pom to enable integration test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum opened a new pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest
taegeonum opened a new pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest URL: https://github.com/apache/incubator-nemo/pull/172 JIRA: [NEMO-270: Test different triggers in GroupByKeyAndWindowDoFnTransformTest](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-270) **Major changes:** - Fix `GroupByKeyAndWindowDoFnTransform` to properly handle `EARLY` and `LATE` triggering. **Tests for the changes:** - Add `eventTimeTriggerTest` to test complex triggering and lateness. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum closed pull request #171: [NEMO-306] Add license checkstyle
taegeonum closed pull request #171: [NEMO-306] Add license checkstyle URL: https://github.com/apache/incubator-nemo/pull/171 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum closed pull request #163: [NEMO-294] Beam-Runner
taegeonum closed pull request #163: [NEMO-294] Beam-Runner URL: https://github.com/apache/incubator-nemo/pull/163 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/run_beam.sh b/bin/run_beam.sh index 41c1ef598..cbd082c7a 100755 --- a/bin/run_beam.sh +++ b/bin/run_beam.sh @@ -17,4 +17,6 @@ # specific language governing permissions and limitations # under the License. -java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-0.1-SNAPSHOT-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@" +java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-$(mvn -q \ + -Dexec.executable=echo -Dexec.args='${project.version}' \ + --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@" diff --git a/bin/run_spark.sh b/bin/run_spark.sh index 057b01747..314fd0d0b 100755 --- a/bin/run_spark.sh +++ b/bin/run_spark.sh @@ -17,4 +17,6 @@ # specific language governing permissions and limitations # under the License. -java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/spark/target/nemo-examples-spark-0.1-SNAPSHOT-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@" +java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/spark/target/nemo-examples-spark-$(mvn -q \ + -Dexec.executable=echo -Dexec.args='${project.version}' \ + --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@" diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java index 035d71985..f95bc73fc 100644 --- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java +++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java @@ -101,6 +101,23 @@ private JobLauncher() { * @throws Exception exception on the way. */ public static void main(final String[] args) throws Exception { +try { + setup(args); + // Launch client main. The shutdown() method is called inside the launchDAG() method. + runUserProgramMain(builtJobConf); +} catch (final InjectionException e) { + throw new RuntimeException(e); +} + } + + /** + * Set up the driver, etc. before the actual execution. + * @param args arguments. + * @throws InjectionException injection exception from REEF. + * @throws ClassNotFoundException class not found exception. + * @throws IOException IO exception. + */ + public static void setup(final String[] args) throws InjectionException, ClassNotFoundException, IOException { // Get Job and Driver Confs builtJobConf = getJobConf(args); @@ -108,77 +125,76 @@ public static void main(final String[] args) throws Exception { LOG.info("Launching RPC Server"); driverRPCServer = new DriverRPCServer(); driverRPCServer - .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> { -}) -.registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown()) - .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown()) - .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll( - SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData() -.run(); + .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> { + }) + .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown()) + .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown()) + .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll( + SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData() + .run(); final Configuration driverConf = getDriverConf(builtJobConf); final Configuration driverNcsConf = getDriverNcsConf(); -final Configuration driverMessageConfg = getDriverMessageConf(); +final Configuration driverMessageConfig = getDriverMessageConf(); +final String defaultExecutorResourceConfig = "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5}," + + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]"; final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class, -JobConf.ExecutorJSONContents.class); + JobConf.ExecutorJSONContents.class,
[GitHub] taegeonum closed pull request #163: [NEMO-294] Beam-Runner
taegeonum closed pull request #163: [NEMO-294] Beam-Runner URL: https://github.com/apache/incubator-nemo/pull/163 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/run_beam.sh b/bin/run_beam.sh index 41c1ef598..cbd082c7a 100755 --- a/bin/run_beam.sh +++ b/bin/run_beam.sh @@ -17,4 +17,6 @@ # specific language governing permissions and limitations # under the License. -java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-0.1-SNAPSHOT-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@" +java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-$(mvn -q \ + -Dexec.executable=echo -Dexec.args='${project.version}' \ + --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@" diff --git a/bin/run_spark.sh b/bin/run_spark.sh index 057b01747..314fd0d0b 100755 --- a/bin/run_spark.sh +++ b/bin/run_spark.sh @@ -17,4 +17,6 @@ # specific language governing permissions and limitations # under the License. -java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/spark/target/nemo-examples-spark-0.1-SNAPSHOT-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@" +java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/spark/target/nemo-examples-spark-$(mvn -q \ + -Dexec.executable=echo -Dexec.args='${project.version}' \ + --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher "$@" diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java index 035d71985..f95bc73fc 100644 --- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java +++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java @@ -101,6 +101,23 @@ private JobLauncher() { * @throws Exception exception on the way. */ public static void main(final String[] args) throws Exception { +try { + setup(args); + // Launch client main. The shutdown() method is called inside the launchDAG() method. + runUserProgramMain(builtJobConf); +} catch (final InjectionException e) { + throw new RuntimeException(e); +} + } + + /** + * Set up the driver, etc. before the actual execution. + * @param args arguments. + * @throws InjectionException injection exception from REEF. + * @throws ClassNotFoundException class not found exception. + * @throws IOException IO exception. + */ + public static void setup(final String[] args) throws InjectionException, ClassNotFoundException, IOException { // Get Job and Driver Confs builtJobConf = getJobConf(args); @@ -108,77 +125,76 @@ public static void main(final String[] args) throws Exception { LOG.info("Launching RPC Server"); driverRPCServer = new DriverRPCServer(); driverRPCServer - .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> { -}) -.registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown()) - .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown()) - .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll( - SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData() -.run(); + .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> { + }) + .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown()) + .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown()) + .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll( + SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData() + .run(); final Configuration driverConf = getDriverConf(builtJobConf); final Configuration driverNcsConf = getDriverNcsConf(); -final Configuration driverMessageConfg = getDriverMessageConf(); +final Configuration driverMessageConfig = getDriverMessageConf(); +final String defaultExecutorResourceConfig = "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5}," + + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]"; final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class, -JobConf.ExecutorJSONContents.class); + JobConf.ExecutorJSONContents.class,
[GitHub] jooykim opened a new pull request #171: [NEMO-306] Add license checkstyle
jooykim opened a new pull request #171: [NEMO-306] Add license checkstyle URL: https://github.com/apache/incubator-nemo/pull/171 JIRA: [NEMO-306: Apache License Checkstyle](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-306) **Major changes:** - Adds a checkstyle rule to check Apache license headers **Minor changes to note:** - N/A **Tests for the changes:** - N/A **Other comments:** - N/A This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho closed pull request #164: [NEMO-293] OOM exception in streaming
seojangho closed pull request #164: [NEMO-293] OOM exception in streaming URL: https://github.com/apache/incubator-nemo/pull/164 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java index 315760c99..12761b27f 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java @@ -18,10 +18,14 @@ */ package org.apache.nemo.runtime.executor.bytetransfer; +import io.netty.buffer.ByteBufOutputStream; +import org.apache.nemo.common.coder.EncoderFactory; +import org.apache.nemo.runtime.executor.data.DataUtil; import org.apache.nemo.runtime.executor.data.FileArea; import org.apache.nemo.runtime.executor.data.partition.SerializedPartition; import io.netty.buffer.ByteBuf; import io.netty.channel.*; +import org.apache.nemo.runtime.executor.data.streamchainer.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,7 +98,7 @@ public void close() throws IOException { currentByteOutputStream.close(); } channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId())) -.addListener(getChannelWriteListener()); + .addListener(getChannelWriteListener()); deregister(); closed = true; } @@ -150,7 +154,7 @@ public void write(final byte[] bytes, final int offset, final int length) throws * @throws IOException when an exception has been set or this stream was closed */ public ByteOutputStream writeSerializedPartition(final SerializedPartition serializedPartition) -throws IOException { + throws IOException { write(serializedPartition.getData(), 0, serializedPartition.getLength()); return this; } @@ -177,7 +181,7 @@ public ByteOutputStream writeFileArea(final FileArea fileArea) throws IOExceptio } @Override -public synchronized void close() throws IOException { +public void close() throws IOException { if (closed) { return; } @@ -198,19 +202,41 @@ private void writeByteBuf(final ByteBuf byteBuf) throws IOException { } } +/** + * Write an element to the channel. + * @param element element + * @param serializer serializer + */ +public void writeElement(final Object element, + final Serializer serializer) { + final ByteBuf byteBuf = channel.alloc().ioBuffer(); + final ByteBufOutputStream byteBufOutputStream = new ByteBufOutputStream(byteBuf); + try { +final OutputStream wrapped = + DataUtil.buildOutputStream(byteBufOutputStream, serializer.getEncodeStreamChainers()); +final EncoderFactory.Encoder encoder = serializer.getEncoderFactory().create(wrapped); +encoder.encode(element); +wrapped.close(); + +writeByteBuf(byteBuf); + } catch (final IOException e) { +throw new RuntimeException(e); + } +} + /** * Writes a data frame. * @param bodythe body or {@code null} * @param length the length of the body, in bytes * @throws IOException when an exception has been set or this stream was closed */ -private synchronized void writeDataFrame(final Object body, final long length) throws IOException { +private void writeDataFrame(final Object body, final long length) throws IOException { ensureNoException(); if (closed) { throw new IOException("Stream already closed."); } channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId(), body, length, newSubStream)) - .addListener(getChannelWriteListener()); +.addListener(getChannelWriteListener()); newSubStream = false; } } diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java index d50ad82fc..9995e6a92 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java @@ -37,6 +37,8 @@ /** * It forwards output to the next operator. * @param nextOperatorVertex next operator to emit data and watermark + * @param edgeIndex edge index + * @param watermarkManager watermark manager
[GitHub] ejjeong opened a new pull request #170: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings
ejjeong opened a new pull request #170: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings URL: https://github.com/apache/incubator-nemo/pull/170 JIRA: [NEMO-10: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-10) **Major changes:** - Handle Method Javadocs Requirements for Checkstyle Warnings in `compiler` **Minor changes to note:** - Removed the unused parameter of `generateMetricCollectVertex` method in `SkewReshapingPass.java`. **Tests for the changes:** - maven checkstyle tests. Closes #170 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on issue #164: [NEMO-293] OOM exception in streaming
taegeonum commented on issue #164: [NEMO-293] OOM exception in streaming URL: https://github.com/apache/incubator-nemo/pull/164#issuecomment-443131143 @seojangho I've addressed your comments. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on issue #163: [NEMO-294] Beam-Runner
taegeonum commented on issue #163: [NEMO-294] Beam-Runner URL: https://github.com/apache/incubator-nemo/pull/163#issuecomment-443131052 @johnyangk @seojangho Could you please take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho closed pull request #168: [NEMO-305] Add DISCLAIMER
seojangho closed pull request #168: [NEMO-305] Add DISCLAIMER URL: https://github.com/apache/incubator-nemo/pull/168 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jooykim opened a new pull request #168: [NEMO-305] Add DISCLAIMER
jooykim opened a new pull request #168: [NEMO-305] Add DISCLAIMER URL: https://github.com/apache/incubator-nemo/pull/168 JIRA: [NEMO-305: Add Disclaimer](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-305) **Major changes:** - Adds the disclaimer file as per Apache's standards (https://incubator.apache.org/guides/branding.html#disclaimers) **Minor changes to note:** - N/A **Tests for the changes:** - N/A **Other comments:** - N/A This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yunseong opened a new pull request #167: [NEMO-304] Fail-fast for mis-configuration in user application
yunseong opened a new pull request #167: [NEMO-304] Fail-fast for mis-configuration in user application URL: https://github.com/apache/incubator-nemo/pull/167 JIRA: [NEMO-304: Fail-fast for mis-configuration in user application](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-304) **Major changes:** - This PR makes JobLauncher check the validity of user main method early, so that we can avoid initializing the whole system if there is a mis-configuration. **Minor changes to note:** - **Tests for the changes:** - Added `testNotExistingUserMain` in SparkScala.java to test whether the `InvalidUserMainException` is thrown if the argument of `addUserMain()` is a non-existing class. **Other comments:** - Closes #GITHUB_PR_NUMBER This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] DifferentSC opened a new pull request #166: [NEMO-80] SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder
DifferentSC opened a new pull request #166: [NEMO-80] SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder URL: https://github.com/apache/incubator-nemo/pull/166 JIRA: [NEMO-###: TITLE](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-###) **Major changes:** - Add slf4j-simple dependency to fix the problem. **Minor changes to note:** - **Tests for the changes:** - **Other comments:** - Closes #GITHUB_PR_NUMBER This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho commented on issue #164: [NEMO-293] OOM exception in streaming
seojangho commented on issue #164: [NEMO-293] OOM exception in streaming URL: https://github.com/apache/incubator-nemo/pull/164#issuecomment-443077811 - From Batch perspective, that flag in `writeDataFrame` alone does not help since using `writeFileArea` is also a common case. - Can you split the PR? Mixing a bugfix and refactoring makes little sense to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yunseong opened a new pull request #165: [NEMO-221] Confusing error messages for a failure at the client-side …
yunseong opened a new pull request #165: [NEMO-221] Confusing error messages for a failure at the client-side … URL: https://github.com/apache/incubator-nemo/pull/165 …initialization JIRA: [NEMO-221: Confusing error messages for a failure at the client-side initialization](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-221) **Major changes:** - Throw a separate Exception if failed to create an RPC connection between Nemo client and REEF Driver (currently the IOException is swallowed by Tang's Injection Exception). This way, we can find the actual reason for the failure, although the beginning of the stack trace is still pretty lengthy with the network failure. An example is as follows: ``` Exception in thread "main" java.lang.RuntimeException: java.lang.Exception: Thread main threw an uncaught exception. at org.apache.nemo.client.JobLauncher.main(JobLauncher.java:180) Caused by: java.lang.Exception: Thread main threw an uncaught exception. at org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler.uncaughtException(REEFUncaughtExceptionHandler.java:68) at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1057) at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1052) at java.lang.Thread.dispatchUncaughtException(Thread.java:1959) Caused by: java.lang.RuntimeException: Unable to configure and start REEFEnvironment. at org.apache.reef.runtime.common.REEFLauncher.fatal(REEFLauncher.java:202) at org.apache.reef.runtime.common.REEFLauncher.main(REEFLauncher.java:183) Caused by: java.lang.RuntimeException: org.apache.reef.tang.exceptions.InjectionException: Could not invoke constructor: new ClientRPC(TransportFactory = [ClassNodeImpl 'org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory']: org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory(org.apache.reef.wake.remote.address.LocalAddressProvider), = org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory@6bb4dd34, LocalAddressProvider = [ClassNodeImpl 'org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider']: org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider(), = org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider@7d9f158f, String ClientSideRPCServerHost = 127.0.0.1, Integer ClientSideRPCServerPort = 19888) ... (25 lines) ... at org.apache.reef.runtime.common.REEFEnvironment.fromConfiguration(REEFEnvironment.java:69) at org.apache.reef.runtime.common.REEFLauncher.main(REEFLauncher.java:180) Caused by: org.apache.reef.tang.exceptions.InjectionException: Could not invoke constructor: new ClientRPC(TransportFactory = [ClassNodeImpl 'org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory']: org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory(org.apache.reef.wake.remote.address.LocalAddressProvider), = org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory@6bb4dd34, LocalAddressProvider = [ClassNodeImpl 'org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider']: org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider(), = org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider@7d9f158f, String ClientSideRPCServerHost = 127.0.0.1, Integer ClientSideRPCServerPort = 19888) at org.apache.reef.tang.implementation.java.InjectorImpl.injectFromPlan(InjectorImpl.java:654) ... (10 lines) ... at org.apache.reef.tang.implementation.java.InjectorImpl.getNamedInstance(InjectorImpl.java:546) at org.apache.reef.tang.InjectionFuture.get(InjectionFuture.java:116) ... 26 more Caused by: java.lang.IllegalStateException: Failed to setup an RPC connection to the Client. A failure at the client-side is suspected. at org.apache.nemo.runtime.master.ClientRPC.(ClientRPC.java:68) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.reef.tang.implementation.java.InjectorImpl.injectFromPlan(InjectorImpl.java:637) ... 37 more ``` **Minor changes to note:** - **Tests for the changes:** - **Other comments:** - Closes #GITHUB_PR_NUMBER This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git
[GitHub] wonook commented on a change in pull request #163: [NEMO-294] Beam-Runner
wonook commented on a change in pull request #163: [NEMO-294] Beam-Runner URL: https://github.com/apache/incubator-nemo/pull/163#discussion_r237151236 ## File path: client/src/main/java/org/apache/nemo/client/JobLauncher.java ## @@ -101,84 +101,100 @@ private JobLauncher() { * @throws Exception exception on the way. */ public static void main(final String[] args) throws Exception { +try { + setup(args); + // Launch client main. The shutdown() method is called inside the launchDAG() method. + runUserProgramMain(builtJobConf); +} catch (final InjectionException e) { + throw new RuntimeException(e); +} + } + + /** + * Set up the driver, etc. before the actual execution. + * @param args arguments. + * @throws InjectionException injection exception from REEF. + * @throws ClassNotFoundException class not found exception. + * @throws IOException IO exception. + */ + public static void setup(final String[] args) throws InjectionException, ClassNotFoundException, IOException { // Get Job and Driver Confs builtJobConf = getJobConf(args); // Registers actions for launching the DAG. LOG.info("Launching RPC Server"); driverRPCServer = new DriverRPCServer(); driverRPCServer - .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> { -}) -.registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown()) - .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown()) - .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll( - SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData() -.run(); + .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event -> { + }) + .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event -> driverReadyLatch.countDown()) + .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event -> jobDoneLatch.countDown()) + .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message -> COLLECTED_DATA.addAll( + SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData() + .run(); final Configuration driverConf = getDriverConf(builtJobConf); final Configuration driverNcsConf = getDriverNcsConf(); -final Configuration driverMessageConfg = getDriverMessageConf(); +final Configuration driverMessageConfig = getDriverMessageConf(); +final String defaultExecutorResourceConfig = "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5}," Review comment: When making a JAR file, and executing the JAR, sadly the json file is doesn't get included in the JAR. I've tried refactoring the code into `src/main/resources` directory, but it didn't work. This change was made to make it possible to apply a default `executor.json` even when it is not explicitly specified. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum opened a new pull request #164: [NEMO-293] OOM exception in streaming
taegeonum opened a new pull request #164: [NEMO-293] OOM exception in streaming URL: https://github.com/apache/incubator-nemo/pull/164 JIRA: [NEMO-293: OOM exception in streaming](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-293) **Major changes:** - Fix wrong byte encoding in `PipeOutputWriter`. This causes OOM because it sends unnecessary bytes (count <= byte array size) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sj6077 commented on issue #146: [NEMO-244] Organize integration tests and resources
sj6077 commented on issue #146: [NEMO-244] Organize integration tests and resources URL: https://github.com/apache/incubator-nemo/pull/146#issuecomment-442362535 @sanha Please merge this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook commented on a change in pull request #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo
wonook commented on a change in pull request #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo URL: https://github.com/apache/incubator-nemo/pull/95#discussion_r236939076 ## File path: .travis.yml ## @@ -17,17 +17,29 @@ # under the License. # .travis.yml -# For maven builds -language: java +language: + # For maven builds + - java + # For web ui build + - node_js dist: trusty +<<< HEAD Review comment: This is preventing the Travis CI to check this PR. Can this be fixed? I think just replacing `sudo:false` with `sudo:required` in the original code would be sufficient. @skystar-p or @seojangho ? I don't have access to the original branch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook opened a new pull request #163: [NEMO-294] Beam-Runner
wonook opened a new pull request #163: [NEMO-294] Beam-Runner URL: https://github.com/apache/incubator-nemo/pull/163 JIRA: [NEMO-294: Support Nemo Runner execution by providing PipelineOptions to the Beam program](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-294) **Major changes:** - Update launch scripts to launch shaded jars with the current project version, instead of 0.1-SNAPSHOT - Refactor `JobLauncher` so that jobs can be launched from the JobLauncher's main class as well as from a particular Beam application. This also changes so that the job ID can be received from the application itself. - Use Google's AutoService from Beam to register NemoRunner as one of its runners with the `NemoRunnerRegistrar`. - Rename `NemoPipelineRunner` into `NemoRunner` to follow conventions. - Instead of having a default `executor_json` file, put in a default JSON string instead, so that it doesn't require a JSON file by default. - Add a MinimalWordCount examples from the beam homepage (from the quickstart article) **Minor changes to note:** - Fix minor typos (e.g., confg --> config) - Fix minor indentation inconsistencies **Tests for the changes:** - Existing tests confirm that these changes do not break the code **Other comments:** - N/A Closes #GITHUB_PR_NUMBER This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk closed pull request #162: [NEMO-303] Use Surefire 3.0.0-M1
johnyangk closed pull request #162: [NEMO-303] Use Surefire 3.0.0-M1 URL: https://github.com/apache/incubator-nemo/pull/162 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index 20cfc45af..7ed51fe03 100644 --- a/pom.xml +++ b/pom.xml @@ -64,6 +64,7 @@ under the License. 2.13.0 2.0.0-beta.5 +3.0.0-M1 4.12 @@ -156,23 +157,28 @@ under the License. org.codehaus.mojo sonar-maven-plugin + +org.apache.maven.plugins +maven-surefire-plugin +${surefire.version} + + +org.apache.maven.plugins +maven-failsafe-plugin +${surefire.version} + + + +integration-test +verify + + + + - -org.apache.maven.plugins -maven-failsafe-plugin -2.20.1 - - - -integration-test -verify - - - - org.apache.maven.plugins maven-checkstyle-plugin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] seojangho opened a new pull request #162: [NEMO-303] Use Surefire 3.0.0-M1
seojangho opened a new pull request #162: [NEMO-303] Use Surefire 3.0.0-M1 URL: https://github.com/apache/incubator-nemo/pull/162 JIRA: [NEMO-303: Use Surefire 3.0.0-M1](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-303) **Minor changes to note:** - POM: Use maven-surefire-plugin 3.0.0-M1 and maven-failsafe-plugin 3.0.0-M1 **Notes** - See SUREFIRE-1588 for the relevant bug report at the ASF Jira system This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] wonook commented on a change in pull request #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo
wonook commented on a change in pull request #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo URL: https://github.com/apache/incubator-nemo/pull/95#discussion_r236059450 ## File path: .travis.yml ## @@ -17,17 +17,29 @@ # under the License. # .travis.yml -# For maven builds -language: java +language: + # For maven builds + - java + # For web ui build + - node_js dist: trusty +<<< HEAD Review comment: This must have gotten in there by mistake This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk closed pull request #161: [NEMO-300] Fix starvation when handling multiple pending data fetchers
johnyangk closed pull request #161: [NEMO-300] Fix starvation when handling multiple pending data fetchers URL: https://github.com/apache/incubator-nemo/pull/161 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java index b08b12a25..af3197999 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java @@ -461,31 +461,34 @@ private boolean handleDataFetchers(final List fetchers) { final Iterator pendingIterator = pendingFetchers.iterator(); final long currentTime = System.currentTimeMillis(); - // We check pending data every polling interval - while (pendingIterator.hasNext() -&& isPollingTime(pollingInterval, currentTime, prevPollingTime)) { + + + if (isPollingTime(pollingInterval, currentTime, prevPollingTime)) { +// We check pending data every polling interval prevPollingTime = currentTime; -final DataFetcher dataFetcher = pendingIterator.next(); -try { - final Object element = dataFetcher.fetchDataElement(); - onEventFromDataFetcher(element, dataFetcher); +while (pendingIterator.hasNext()) { + final DataFetcher dataFetcher = pendingIterator.next(); + try { +final Object element = dataFetcher.fetchDataElement(); +onEventFromDataFetcher(element, dataFetcher); + +// We processed data. This means the data fetcher is now available. +// Add current data fetcher to available +pendingIterator.remove(); +if (!(element instanceof Finishmark)) { + availableFetchers.add(dataFetcher); +} - // We processed data. This means the data fetcher is now available. - // Add current data fetcher to available - pendingIterator.remove(); - if (!(element instanceof Finishmark)) { -availableFetchers.add(dataFetcher); + } catch (final NoSuchElementException e) { +// The current data fetcher is still pending.. try next data fetcher + } catch (final IOException e) { +// IOException means that this task should be retried. +taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY, + Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE)); +LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e); +return false; } - -} catch (final NoSuchElementException e) { - // The current data fetcher is still pending.. try next data fetcher -} catch (final IOException e) { - // IOException means that this task should be retried. - taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY, -Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE)); - LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e); - return false; } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum closed pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
taegeonum closed pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java index a6269a085..6a6ca4d43 100644 --- a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java +++ b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java @@ -20,7 +20,6 @@ import org.apache.nemo.common.exception.CompileTimeOptimizationException; import org.apache.nemo.common.ir.edge.IREdge; -import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty; import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty; import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty; import org.apache.nemo.common.ir.vertex.*; @@ -258,14 +257,6 @@ private void sinkCheck() { * Helper method to check that all execution properties are correct and makes sense. */ private void executionPropertyCheck() { -// SideInput is not compatible with Push -vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e) -.filter(e -> e.getPropertyValue(BroadcastVariableIdProperty.class).isPresent()) -.filter(e -> DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get())) -.forEach(e -> { - throw new CompileTimeOptimizationException("DAG execution property check: " - + "Broadcast edge is not compatible with push" + e.getId()); -})); // DataSizeMetricCollection is not compatible with Push (All data have to be stored before the data collection) vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e) .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass) diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java index e676e6e8c..1055f0bb9 100644 --- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java +++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java @@ -49,7 +49,7 @@ public boolean equals(final Object o) { @Override public String toString() { -return String.valueOf(timestamp); +return String.valueOf("Watermark(" + timestamp + ")"); } @Override diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java new file mode 100644 index 0..9a18b0c9f --- /dev/null +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam; + +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.nemo.common.Pair; +import org.apache.nemo.compiler.frontend.beam.transform.CreateViewTransform; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.*; + +/** + * Accumulates and provides side inputs in memory. + */ +public final class InMemorySideInputReader implements ReadyCheckingSideInputReader { + private static final Logger LOG = LoggerFactory.getLogger(InMemorySideInputReader.class.getName()); + + private long curWatermark = Long.MIN_VALUE; + + private final Collection> sideInputsToRead; + private final Map,
[GitHub] johnyangk commented on issue #160: [NEMO-2186] Parallelism=1 for PCollectionView
johnyangk commented on issue #160: [NEMO-2186] Parallelism=1 for PCollectionView URL: https://github.com/apache/incubator-nemo/pull/160#issuecomment-440575480 @taegeonum Please review this after #159 is merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk opened a new pull request #160: [NEMO-2186] Parallelism=1 for PCollectionView
johnyangk opened a new pull request #160: [NEMO-2186] Parallelism=1 for PCollectionView URL: https://github.com/apache/incubator-nemo/pull/160 JIRA: [NEMO-286: Parallelism=1 for PCollectionView](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-286) **Major changes:** - Set Parallelism=1 for side input stages - Set CommPattern=Broadcast from/to side input stages This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235279915 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java ## @@ -132,10 +145,14 @@ public final DoFn getDoFn() { * Checks whether the bundle is finished or not. * Starts the bundle if it is done. */ - protected final void checkAndInvokeBundle() { + final void checkAndInvokeBundle() { if (bundleFinished) { bundleFinished = false; - doFnRunner.startBundle(); + if (pushBackRunner == null) { Review comment: That will give us 3 duplicate methods. I left a comment on refactoring. Maybe we can come back to this when implementing streaming partial combiner (i.e., combining ReduceFn) which will probably also use pushBackRunner? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235279915 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java ## @@ -132,10 +145,14 @@ public final DoFn getDoFn() { * Checks whether the bundle is finished or not. * Starts the bundle if it is done. */ - protected final void checkAndInvokeBundle() { + final void checkAndInvokeBundle() { if (bundleFinished) { bundleFinished = false; - doFnRunner.startBundle(); + if (pushBackRunner == null) { Review comment: That will give us 3 duplicate methods. I left a comment on refactoring. Maybe we can come back to this when implementing streaming partial combiner (i.e., combining ReduceFn)? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235277020 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java ## @@ -132,25 +150,46 @@ public final DoFn getDoFn() { * Checks whether the bundle is finished or not. * Starts the bundle if it is done. */ - protected final void checkAndInvokeBundle() { + final void checkAndInvokeBundle() { if (bundleFinished) { bundleFinished = false; - doFnRunner.startBundle(); + if (pushBackRunner == null) { +doFnRunner.startBundle(); + } else { +pushBackRunner.startBundle(); + } prevBundleStartTime = System.currentTimeMillis(); currBundleCount = 0; } currBundleCount += 1; } - /** * Checks whether it is time to finish the bundle and finish it. */ - protected final void checkAndFinishBundle() { + final void checkAndFinishBundle() { if (!bundleFinished) { if (currBundleCount >= bundleSize || System.currentTimeMillis() - prevBundleStartTime >= bundleMillis) { bundleFinished = true; +if (pushBackRunner == null) { + doFnRunner.finishBundle(); +} else { + pushBackRunner.finishBundle(); +} + } +} + } + + /** + * Finish bundle without checking for conditions. + */ + final void forceFinishBundle() { Review comment: This is also used in AbstractDoFnTransform#close This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235277346 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * DoFn transform implementation with push backs for side inputs. + * + * @param input type. + * @param output type. + */ +public final class PushBackDoFnTransform extends AbstractDoFnTransform { + private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName()); + + private List> curPushedBacks; + private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists. + private long curInputWatermark; + private long curOutputWatermark; + + /** + * PushBackDoFnTransform Constructor. + */ + public PushBackDoFnTransform(final DoFn doFn, + final Coder inputCoder, + final Map, Coder> outputCoders, + final TupleTag mainOutputTag, + final List> additionalOutputTags, + final WindowingStrategy windowingStrategy, + final Map> sideInputs, + final PipelineOptions options, + final DisplayData displayData) { +super(doFn, inputCoder, outputCoders, mainOutputTag, + additionalOutputTags, windowingStrategy, sideInputs, options, displayData); +this.curPushedBacks = new ArrayList<>(); +this.curPushedBackWatermark = Long.MAX_VALUE; +this.curInputWatermark = Long.MIN_VALUE; +this.curOutputWatermark = Long.MIN_VALUE; + } + + @Override + protected DoFn wrapDoFn(final DoFn initDoFn) { +return initDoFn; + } + + @Override + public void onData(final WindowedValue data) { +// Need to distinguish side/main inputs and push-back main inputs. +if (data.getValue() instanceof SideInputElement) { + // This element is a Side Input + // TODO #287: Consider Explicit Multi-Input IR Transform + final WindowedValue sideInputElement = (WindowedValue) data; + final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex()); + getSideInputReader().addSideInputElement(view, data); + + handlePushBacks(); + + // See if we can emit a new watermark, as we may have processed some pushed-back elements + onWatermark(new Watermark(curInputWatermark)); +} else { + // This element is the Main Input + checkAndInvokeBundle(); + final Iterable> pushedBack = +getPushBackRunner().processElementInReadyWindows(data); + for (final WindowedValue wv : pushedBack) { +curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis()); +curPushedBacks.add(wv); + } + checkAndFinishBundle(); +} + } + + private void handlePushBacks() { +// Force-flush, before (possibly) processing pushed-back data. +// +// Main reason: +// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} +// caches for each bundle the side inputs that are not ready. +// We need to
[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235277341 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * DoFn transform implementation with push backs for side inputs. + * + * @param input type. + * @param output type. + */ +public final class PushBackDoFnTransform extends AbstractDoFnTransform { + private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName()); + + private List> curPushedBacks; + private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists. + private long curInputWatermark; + private long curOutputWatermark; + + /** + * PushBackDoFnTransform Constructor. + */ + public PushBackDoFnTransform(final DoFn doFn, + final Coder inputCoder, + final Map, Coder> outputCoders, + final TupleTag mainOutputTag, + final List> additionalOutputTags, + final WindowingStrategy windowingStrategy, + final Map> sideInputs, + final PipelineOptions options, + final DisplayData displayData) { +super(doFn, inputCoder, outputCoders, mainOutputTag, + additionalOutputTags, windowingStrategy, sideInputs, options, displayData); +this.curPushedBacks = new ArrayList<>(); +this.curPushedBackWatermark = Long.MAX_VALUE; +this.curInputWatermark = Long.MIN_VALUE; +this.curOutputWatermark = Long.MIN_VALUE; + } + + @Override + protected DoFn wrapDoFn(final DoFn initDoFn) { +return initDoFn; + } + + @Override + public void onData(final WindowedValue data) { +// Need to distinguish side/main inputs and push-back main inputs. +if (data.getValue() instanceof SideInputElement) { + // This element is a Side Input + // TODO #287: Consider Explicit Multi-Input IR Transform + final WindowedValue sideInputElement = (WindowedValue) data; + final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex()); + getSideInputReader().addSideInputElement(view, data); + + handlePushBacks(); + + // See if we can emit a new watermark, as we may have processed some pushed-back elements + onWatermark(new Watermark(curInputWatermark)); +} else { + // This element is the Main Input + checkAndInvokeBundle(); + final Iterable> pushedBack = +getPushBackRunner().processElementInReadyWindows(data); + for (final WindowedValue wv : pushedBack) { +curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis()); +curPushedBacks.add(wv); + } + checkAndFinishBundle(); +} + } + + private void handlePushBacks() { +// Force-flush, before (possibly) processing pushed-back data. +// +// Main reason: +// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} +// caches for each bundle the side inputs that are not ready. +// We need to
[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235275944 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * DoFn transform implementation with push backs for side inputs. + * + * @param input type. + * @param output type. + */ +public final class PushBackDoFnTransform extends AbstractDoFnTransform { + private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName()); + + private List> curPushedBacks; + private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists. + private long curInputWatermark; + private long curOutputWatermark; + + /** + * PushBackDoFnTransform Constructor. + */ + public PushBackDoFnTransform(final DoFn doFn, + final Coder inputCoder, + final Map, Coder> outputCoders, + final TupleTag mainOutputTag, + final List> additionalOutputTags, + final WindowingStrategy windowingStrategy, + final Map> sideInputs, + final PipelineOptions options, + final DisplayData displayData) { +super(doFn, inputCoder, outputCoders, mainOutputTag, + additionalOutputTags, windowingStrategy, sideInputs, options, displayData); +this.curPushedBacks = new ArrayList<>(); +this.curPushedBackWatermark = Long.MAX_VALUE; +this.curInputWatermark = Long.MIN_VALUE; +this.curOutputWatermark = Long.MIN_VALUE; + } + + @Override + protected DoFn wrapDoFn(final DoFn initDoFn) { +return initDoFn; + } + + @Override + public void onData(final WindowedValue data) { +// Need to distinguish side/main inputs and push-back main inputs. +if (data.getValue() instanceof SideInputElement) { + // This element is a Side Input + // TODO #287: Consider Explicit Multi-Input IR Transform + final WindowedValue sideInputElement = (WindowedValue) data; + final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex()); + getSideInputReader().addSideInputElement(view, data); + + handlePushBacks(); + + // See if we can emit a new watermark, as we may have processed some pushed-back elements + onWatermark(new Watermark(curInputWatermark)); +} else { + // This element is the Main Input + checkAndInvokeBundle(); + final Iterable> pushedBack = +getPushBackRunner().processElementInReadyWindows(data); + for (final WindowedValue wv : pushedBack) { +curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis()); +curPushedBacks.add(wv); + } + checkAndFinishBundle(); +} + } + + private void handlePushBacks() { +// Force-flush, before (possibly) processing pushed-back data. +// +// Main reason: +// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} +// caches for each bundle the side inputs that are not ready. +// We need to
[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235247765 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * DoFn transform implementation with push backs for side inputs. + * + * @param input type. + * @param output type. + */ +public final class PushBackDoFnTransform extends AbstractDoFnTransform { + private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName()); + + private List> curPushedBacks; + private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists. + private long curInputWatermark; + private long curOutputWatermark; + + /** + * PushBackDoFnTransform Constructor. + */ + public PushBackDoFnTransform(final DoFn doFn, + final Coder inputCoder, + final Map, Coder> outputCoders, + final TupleTag mainOutputTag, + final List> additionalOutputTags, + final WindowingStrategy windowingStrategy, + final Map> sideInputs, + final PipelineOptions options, + final DisplayData displayData) { +super(doFn, inputCoder, outputCoders, mainOutputTag, + additionalOutputTags, windowingStrategy, sideInputs, options, displayData); +this.curPushedBacks = new ArrayList<>(); +this.curPushedBackWatermark = Long.MAX_VALUE; +this.curInputWatermark = Long.MIN_VALUE; +this.curOutputWatermark = Long.MIN_VALUE; + } + + @Override + protected DoFn wrapDoFn(final DoFn initDoFn) { +return initDoFn; + } + + @Override + public void onData(final WindowedValue data) { +// Need to distinguish side/main inputs and push-back main inputs. +if (data.getValue() instanceof SideInputElement) { + // This element is a Side Input + // TODO #287: Consider Explicit Multi-Input IR Transform + final WindowedValue sideInputElement = (WindowedValue) data; + final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex()); + getSideInputReader().addSideInputElement(view, data); + + handlePushBacks(); + + // See if we can emit a new watermark, as we may have processed some pushed-back elements + onWatermark(new Watermark(curInputWatermark)); +} else { + // This element is the Main Input + checkAndInvokeBundle(); + final Iterable> pushedBack = +getPushBackRunner().processElementInReadyWindows(data); + for (final WindowedValue wv : pushedBack) { +curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis()); +curPushedBacks.add(wv); + } + checkAndFinishBundle(); +} + } + + private void handlePushBacks() { +// Force-flush, before (possibly) processing pushed-back data. +// +// Main reason: +// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} +// caches for each bundle the side inputs that are not ready. +// We need to
[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235247665 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java ## @@ -132,25 +150,46 @@ public final DoFn getDoFn() { * Checks whether the bundle is finished or not. * Starts the bundle if it is done. */ - protected final void checkAndInvokeBundle() { + final void checkAndInvokeBundle() { if (bundleFinished) { bundleFinished = false; - doFnRunner.startBundle(); + if (pushBackRunner == null) { +doFnRunner.startBundle(); + } else { +pushBackRunner.startBundle(); + } prevBundleStartTime = System.currentTimeMillis(); currBundleCount = 0; } currBundleCount += 1; } - /** * Checks whether it is time to finish the bundle and finish it. */ - protected final void checkAndFinishBundle() { + final void checkAndFinishBundle() { if (!bundleFinished) { if (currBundleCount >= bundleSize || System.currentTimeMillis() - prevBundleStartTime >= bundleMillis) { bundleFinished = true; +if (pushBackRunner == null) { + doFnRunner.finishBundle(); +} else { + pushBackRunner.finishBundle(); +} + } +} + } + + /** + * Finish bundle without checking for conditions. + */ + final void forceFinishBundle() { Review comment: This is only used for PushBackRunner. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235247750 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * DoFn transform implementation with push backs for side inputs. + * + * @param input type. + * @param output type. + */ +public final class PushBackDoFnTransform extends AbstractDoFnTransform { + private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName()); + + private List> curPushedBacks; + private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists. + private long curInputWatermark; + private long curOutputWatermark; + + /** + * PushBackDoFnTransform Constructor. + */ + public PushBackDoFnTransform(final DoFn doFn, + final Coder inputCoder, + final Map, Coder> outputCoders, + final TupleTag mainOutputTag, + final List> additionalOutputTags, + final WindowingStrategy windowingStrategy, + final Map> sideInputs, + final PipelineOptions options, + final DisplayData displayData) { +super(doFn, inputCoder, outputCoders, mainOutputTag, + additionalOutputTags, windowingStrategy, sideInputs, options, displayData); +this.curPushedBacks = new ArrayList<>(); +this.curPushedBackWatermark = Long.MAX_VALUE; +this.curInputWatermark = Long.MIN_VALUE; +this.curOutputWatermark = Long.MIN_VALUE; + } + + @Override + protected DoFn wrapDoFn(final DoFn initDoFn) { +return initDoFn; + } + + @Override + public void onData(final WindowedValue data) { +// Need to distinguish side/main inputs and push-back main inputs. +if (data.getValue() instanceof SideInputElement) { + // This element is a Side Input + // TODO #287: Consider Explicit Multi-Input IR Transform + final WindowedValue sideInputElement = (WindowedValue) data; + final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex()); + getSideInputReader().addSideInputElement(view, data); + + handlePushBacks(); + + // See if we can emit a new watermark, as we may have processed some pushed-back elements + onWatermark(new Watermark(curInputWatermark)); +} else { + // This element is the Main Input + checkAndInvokeBundle(); + final Iterable> pushedBack = +getPushBackRunner().processElementInReadyWindows(data); + for (final WindowedValue wv : pushedBack) { +curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis()); +curPushedBacks.add(wv); + } + checkAndFinishBundle(); +} + } + + private void handlePushBacks() { +// Force-flush, before (possibly) processing pushed-back data. +// +// Main reason: +// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} +// caches for each bundle the side inputs that are not ready. +// We need to
[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235249217 ## File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java ## @@ -104,6 +104,7 @@ private boolean isWatermarkTriggerTime() { private Object retrieveElement() throws NoSuchElementException, IOException { // Emit watermark if (!bounded && isWatermarkTriggerTime()) { + // index=0 as there is only 1 input stream Review comment: please remove this comment This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235249002 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * DoFn transform implementation with push backs for side inputs. + * + * @param input type. + * @param output type. + */ +public final class PushBackDoFnTransform extends AbstractDoFnTransform { + private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName()); + + private List> curPushedBacks; + private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists. + private long curInputWatermark; + private long curOutputWatermark; + + /** + * PushBackDoFnTransform Constructor. + */ + public PushBackDoFnTransform(final DoFn doFn, + final Coder inputCoder, + final Map, Coder> outputCoders, + final TupleTag mainOutputTag, + final List> additionalOutputTags, + final WindowingStrategy windowingStrategy, + final Map> sideInputs, + final PipelineOptions options, + final DisplayData displayData) { +super(doFn, inputCoder, outputCoders, mainOutputTag, + additionalOutputTags, windowingStrategy, sideInputs, options, displayData); +this.curPushedBacks = new ArrayList<>(); +this.curPushedBackWatermark = Long.MAX_VALUE; +this.curInputWatermark = Long.MIN_VALUE; +this.curOutputWatermark = Long.MIN_VALUE; + } + + @Override + protected DoFn wrapDoFn(final DoFn initDoFn) { +return initDoFn; + } + + @Override + public void onData(final WindowedValue data) { +// Need to distinguish side/main inputs and push-back main inputs. +if (data.getValue() instanceof SideInputElement) { + // This element is a Side Input + // TODO #287: Consider Explicit Multi-Input IR Transform + final WindowedValue sideInputElement = (WindowedValue) data; + final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex()); + getSideInputReader().addSideInputElement(view, data); + + handlePushBacks(); + + // See if we can emit a new watermark, as we may have processed some pushed-back elements + onWatermark(new Watermark(curInputWatermark)); +} else { + // This element is the Main Input + checkAndInvokeBundle(); + final Iterable> pushedBack = +getPushBackRunner().processElementInReadyWindows(data); + for (final WindowedValue wv : pushedBack) { +curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis()); +curPushedBacks.add(wv); + } + checkAndFinishBundle(); +} + } + + private void handlePushBacks() { +// Force-flush, before (possibly) processing pushed-back data. +// +// Main reason: +// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} +// caches for each bundle the side inputs that are not ready. +// We need to
[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] Support side inputs and windowing URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235249018 ## File path: compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.frontend.beam.transform; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.nemo.common.ir.OutputCollector; +import org.apache.nemo.common.punctuation.Watermark; +import org.apache.nemo.compiler.frontend.beam.SideInputElement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * DoFn transform implementation with push backs for side inputs. + * + * @param input type. + * @param output type. + */ +public final class PushBackDoFnTransform extends AbstractDoFnTransform { + private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName()); + + private List> curPushedBacks; + private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists. + private long curInputWatermark; + private long curOutputWatermark; + + /** + * PushBackDoFnTransform Constructor. + */ + public PushBackDoFnTransform(final DoFn doFn, + final Coder inputCoder, + final Map, Coder> outputCoders, + final TupleTag mainOutputTag, + final List> additionalOutputTags, + final WindowingStrategy windowingStrategy, + final Map> sideInputs, + final PipelineOptions options, + final DisplayData displayData) { +super(doFn, inputCoder, outputCoders, mainOutputTag, + additionalOutputTags, windowingStrategy, sideInputs, options, displayData); +this.curPushedBacks = new ArrayList<>(); +this.curPushedBackWatermark = Long.MAX_VALUE; +this.curInputWatermark = Long.MIN_VALUE; +this.curOutputWatermark = Long.MIN_VALUE; + } + + @Override + protected DoFn wrapDoFn(final DoFn initDoFn) { +return initDoFn; + } + + @Override + public void onData(final WindowedValue data) { +// Need to distinguish side/main inputs and push-back main inputs. +if (data.getValue() instanceof SideInputElement) { + // This element is a Side Input + // TODO #287: Consider Explicit Multi-Input IR Transform + final WindowedValue sideInputElement = (WindowedValue) data; + final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex()); + getSideInputReader().addSideInputElement(view, data); + + handlePushBacks(); + + // See if we can emit a new watermark, as we may have processed some pushed-back elements + onWatermark(new Watermark(curInputWatermark)); +} else { + // This element is the Main Input + checkAndInvokeBundle(); + final Iterable> pushedBack = +getPushBackRunner().processElementInReadyWindows(data); + for (final WindowedValue wv : pushedBack) { +curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis()); +curPushedBacks.add(wv); + } + checkAndFinishBundle(); +} + } + + private void handlePushBacks() { +// Force-flush, before (possibly) processing pushed-back data. +// +// Main reason: +// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner} +// caches for each bundle the side inputs that are not ready. +// We need to