[GitHub] wonook commented on issue #180: [NEMO-319] Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
wonook commented on issue #180: [NEMO-319] Fix path to beam resources in 
examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#issuecomment-447261325
 
 
   Lovely! It looks good to me.  I'll merge after the tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests

2018-12-14 Thread GitBox
wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam 
ValidatesRunner tests
URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447259465
 
 
   Oh, and I forgot to mention, but making an über jar is the right direction. 
I think


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kennknowles commented on issue #180: [NEMO-319] Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
kennknowles commented on issue #180: [NEMO-319] Fix path to beam resources in 
examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#issuecomment-447259985
 
 
   OK, all the edits are done, I think.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests

2018-12-14 Thread GitBox
wonook commented on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests
URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447259465
 
 
   Oh, and I forgot to mention, but making an über jar is the right direction.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests

2018-12-14 Thread GitBox
wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam 
ValidatesRunner tests
URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447255102
 
 
   @kennknowles I think referring to our `nemo-examples-beam` package `pom.xml` 
could help creating the right deps & bundling & distribution. To my knowledge, 
it is capable of running various Beam programs with the specified modules. To 
provide a bit more information, `com.github.fommil.netlib` library was used to 
implement the ALS example, and others are for other uses, so these would be 
irrelevant. So I think adding the following lines to your pom.xml will help. I 
hope this helps!
   
   ```
   
org.apache.hadoop
hadoop-common
${hadoop.version}


org.slf4j
slf4j-api


org.slf4j
slf4j-log4j12



   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam ValidatesRunner tests

2018-12-14 Thread GitBox
wonook edited a comment on issue #182: [NEMO-292] (WIP) Run Beam 
ValidatesRunner tests
URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447255102
 
 
   @kennknowles I think referring to our `nemo-examples-beam` package `pom.xml` 
could help creating the right deps & bundling & distribution. To my knowledge, 
it is capable of running various Beam programs with the specified modules. To 
provide a bit more information, `com.github.fommil.netlib` library was used to 
implement the ALS example, so this would be irrelevant but try adding the 
following lines to your pom.xml. I hope this helps!
   
   ```
   
org.apache.hadoop
hadoop-common
${hadoop.version}


org.slf4j
slf4j-api


org.slf4j
slf4j-log4j12



   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on issue #180: [NEMO-319] Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
wonook commented on issue #180: [NEMO-319] Fix path to beam resources in 
examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#issuecomment-447257364
 
 
   @kennknowles Thanks for the quick reaction! Just one favor to ask regarding 
the PR: in the README, there's an `Examples` section at the almost very bottom 
of the document. Would it be possible to make the same changes there as well? I 
think it would be nice to handle all of relevant issues at once
   I'll squash & merge the PR as soon as it's handled, you won't need to rebase 
your commits.
   Thanks a lot! 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on issue #182: (WIP) Add Beam spec tests

2018-12-14 Thread GitBox
wonook commented on issue #182: (WIP) Add Beam spec tests
URL: https://github.com/apache/incubator-nemo/pull/182#issuecomment-447255102
 
 
   @kennknowles I think referring to our `nemo-examples-beam` package `pom.xml` 
could help creating the right deps & bundling & distribution. To my knowledge, 
it is capable of running various Beam programs with the specified modules. To 
provide a bit more information, `com.github.fommil.netlib` library was used to 
implement the ALS example, so this would be irrelevant but try adding the 
following lines to your pom.xml. I hope this helps!
   
   ```
   
 org.apache.beam
 beam-sdks-java-extensions-sql
 ${beam.version}
   
   
   org.apache.hadoop
   hadoop-mapreduce-client-core
   ${hadoop.version}
   provided
   
   
   org.apache.hadoop
   hadoop-common
   ${hadoop.version}
   
   
   org.slf4j
   slf4j-api
   
   
   org.slf4j
   slf4j-log4j12
   
   
   
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to 
beam resources in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676353
 
 

 ##
 File path: README.md
 ##
 @@ -86,7 +86,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ./bin/run_beam.sh \
-deploy_mode yarn \
-job_id mr_transient \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to 
beam resources in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676326
 
 

 ##
 File path: README.md
 ##
 @@ -77,7 +77,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ## MapReduce example
 ./bin/run_beam.sh \
-job_id mr_default \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
-optimization_policy 
org.apache.nemo.compiler.optimizer.policy.DefaultPolicy \
-user_main org.apache.nemo.examples.beam.WordCount \
-user_args "`pwd`/examples/resources/test_input_wordcount 
`pwd`/examples/resources/test_output_wordcount"
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to 
beam resources in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676368
 
 

 ##
 File path: README.md
 ##
 @@ -86,7 +86,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ./bin/run_beam.sh \
-deploy_mode yarn \
-job_id mr_transient \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to 
beam resources in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676124
 
 

 ##
 File path: README.md
 ##
 @@ -77,7 +77,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ## MapReduce example
 ./bin/run_beam.sh \
-job_id mr_default \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
kennknowles commented on a change in pull request #180: [NEMO-319] Fix path to 
beam resources in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241676093
 
 

 ##
 File path: README.md
 ##
 @@ -77,7 +77,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ## MapReduce example
 ./bin/run_beam.sh \
-job_id mr_default \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on issue #181: [NEMO-159] Nemo Web UI

2018-12-14 Thread GitBox
wonook commented on issue #181: [NEMO-159] Nemo Web UI
URL: https://github.com/apache/incubator-nemo/pull/181#issuecomment-447252608
 
 
   @seojangho I've updated the tests. Please check again!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kennknowles opened a new pull request #182: (WIP) Add Beam spec tests

2018-12-14 Thread GitBox
kennknowles opened a new pull request #182: (WIP) Add Beam spec tests
URL: https://github.com/apache/incubator-nemo/pull/182
 
 
   Opening this for early feedback. I spent a little while trying to get this 
to run, but did not finish it. I hope that some member of the Nemo community 
can easily point to my error.
   
   In this draft:
   
- `compiler/frontend/beam/spectestsbundle` is an über jar with everything 
needed to run the tests
- `compiler/frontend/beam/spectests` is the maven surefire config to run 
the tests
   
   I still get `ClassNotFound: CoderRegistry` in every test case, so the deps & 
bundling & distribution of the code is clearly still not right.
   
   Previously, I tried doing this inline in `compiler/frontend/beam/pom.xml`. I 
got the same error and thought it might be something to do with how the jars 
are used to execute the tests, so (like most distributed big data systems) I 
built more logic to create an über jar. The simpler version is here: 
https://github.com/kennknowles/incubator-nemo/tree/first-try-spectests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #180: Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
johnyangk commented on a change in pull request #180: Fix path to beam 
resources in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241665971
 
 

 ##
 File path: README.md
 ##
 @@ -77,7 +77,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ## MapReduce example
 ./bin/run_beam.sh \
-job_id mr_default \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
-optimization_policy 
org.apache.nemo.compiler.optimizer.policy.DefaultPolicy \
-user_main org.apache.nemo.examples.beam.WordCount \
-user_args "`pwd`/examples/resources/test_input_wordcount 
`pwd`/examples/resources/test_output_wordcount"
 
 Review comment:
   Please change this line to:
   
   ```bash
   -user_args "`pwd`/examples/resources/inputs/test_input_wordcount 
`pwd`/outputs/wordcount"
   ```
   
   (1) the `inputs` directory was also added in #146
   (2) the `outputs` directory can be easier to find


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #180: Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
johnyangk commented on a change in pull request #180: Fix path to beam 
resources in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241666839
 
 

 ##
 File path: README.md
 ##
 @@ -77,7 +77,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ## MapReduce example
 ./bin/run_beam.sh \
-job_id mr_default \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
 
 Review comment:
   executor -> executors


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #180: Fix path to beam resources in examples in README

2018-12-14 Thread GitBox
johnyangk commented on a change in pull request #180: Fix path to beam 
resources in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241667063
 
 

 ##
 File path: README.md
 ##
 @@ -86,7 +86,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ./bin/run_beam.sh \
-deploy_mode yarn \
-job_id mr_transient \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
 
 Review comment:
   executor -> executors


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on a change in pull request #180: Fix path to beam resources in examples in README

2018-12-13 Thread GitBox
wonook commented on a change in pull request #180: Fix path to beam resources 
in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241666959
 
 

 ##
 File path: README.md
 ##
 @@ -86,7 +86,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ./bin/run_beam.sh \
-deploy_mode yarn \
-job_id mr_transient \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
 
 Review comment:
   also `executors` here as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on a change in pull request #180: Fix path to beam resources in examples in README

2018-12-13 Thread GitBox
wonook commented on a change in pull request #180: Fix path to beam resources 
in examples in README
URL: https://github.com/apache/incubator-nemo/pull/180#discussion_r241666859
 
 

 ##
 File path: README.md
 ##
 @@ -77,7 +77,7 @@ Please refer to the [Contribution 
guideline](.github/CONTRIBUTING.md) to contrib
 ## MapReduce example
 ./bin/run_beam.sh \
-job_id mr_default \
-   -executor_json 
`pwd`/examples/resources/beam_test_executor_resources.json \
+   -executor_json 
`pwd`/examples/resources/executor/beam_test_executor_resources.json \
 
 Review comment:
   I think it should be `executors` here!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on issue #181: [NEMO-159] Nemo Web UI

2018-12-13 Thread GitBox
seojangho commented on issue #181: [NEMO-159] Nemo Web UI
URL: https://github.com/apache/incubator-nemo/pull/181#issuecomment-447240855
 
 
   License check, please. All files should be licensed to ASF.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook opened a new pull request #181: [NEMO-159] Nemo Web UI

2018-12-13 Thread GitBox
wonook opened a new pull request #181: [NEMO-159] Nemo Web UI
URL: https://github.com/apache/incubator-nemo/pull/181
 
 
   JIRA: [NEMO-159: Nemo Web 
UI](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-159)
   
   **Major changes:**
   - Provides structure to the Web UI application
   - Creates a number of views (Stages View, Storage View, Jobs View, etc.)
   - Major update on the UI/UX of the Web UI to make visualization easily 
accessible to users
   - Major re-design of the DAG visualization, to provide appropriate labels 
about the transforms.
   
   **Minor changes to note:**
   - Various code cleanups
   
   **Tests for the changes:**
   - Existing tests verify the code.
   
   **Other comments:**
   - None
   
   Closes #


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kennknowles opened a new pull request #180: Fix path to beam resources in examples in README

2018-12-13 Thread GitBox
kennknowles opened a new pull request #180: Fix path to beam resources in 
examples in README
URL: https://github.com/apache/incubator-nemo/pull/180
 
 
   Files in `examples/beam/resources` were moved in #146 but the commands in 
the README were not updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on issue #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo

2018-12-13 Thread GitBox
johnyangk commented on issue #95: [NEMO-25] Improve WebUI to use RESTful APIs 
by Nemo
URL: https://github.com/apache/incubator-nemo/pull/95#issuecomment-447217829
 
 
   Copyright looks good  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform

2018-12-13 Thread GitBox
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize 
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241644565
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -134,14 +131,15 @@ public void onData(final WindowedValue> 
element) {
 
   /**
* Process the collected data and trigger timers.
-   * @param inputWatermark current input watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
-  private void processElementsAndTriggerTimers(final Watermark inputWatermark,
-   final Instant processingTime,
+  private void processElementsAndTriggerTimers(final Instant processingTime,
final Instant synchronizedTime) 
{
-for (final Map.Entry>> entry : 
keyToValues.entrySet()) {
 
 Review comment:
    for avoiding iterating over 'empty' keys.
   (e.g., keys from previous windows that we never encounter in the current 
window)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform

2018-12-13 Thread GitBox
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize 
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241641897
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/NemoTimerInternals.java
 ##
 @@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.*;
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.state.*;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Nemo timer internals that add/remove timer data to timer context.
+ * @param  key type
+ */
+public final class NemoTimerInternals implements TimerInternals {
+
+  /** The current set timers by namespace and ID. */
+  private final Table 
existingTimers = HashBasedTable.create();
+
+  /** Current output watermark. */
+  @Nullable private Instant outputWatermarkTime = null;
+
+  private final K key;
+  private final ContextForTimer context;
+
+  public NemoTimerInternals(final K key,
+final ContextForTimer context) {
+this.key = key;
+this.context = context;
+  }
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+return outputWatermarkTime;
+  }
+
+  public void setCurrentOutputWatermarkTime(final Instant time) {
+outputWatermarkTime = time;
+  }
+
+  @Override
+  public void setTimer(
+final StateNamespace namespace, final String timerId, final Instant 
target, final TimeDomain timeDomain) {
+setTimer(TimerInternals.TimerData.of(timerId, namespace, target, 
timeDomain));
+  }
+
+  /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, 
TimeDomain)}. */
+  @Deprecated
+  @Override
+  public void setTimer(final TimerInternals.TimerData timerData) {
+@Nullable
+final TimerInternals.TimerData existing = 
existingTimers.get(timerData.getNamespace(), timerData.getTimerId());
+if (existing == null) {
+  existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), 
timerData);
+  context.addTimer(key, timerData);
+} else {
+  checkArgument(
+timerData.getDomain().equals(existing.getDomain()),
+"Attempt to set %s for time domain %s, but it is already set for time 
domain %s",
+timerData.getTimerId(),
+timerData.getDomain(),
+existing.getDomain());
+
+  if (!timerData.getTimestamp().equals(existing.getTimestamp())) {
+context.removeTimer(key, existing);
 
 Review comment:
   Is it okay to remove the existing timer? Can you add a comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform

2018-12-13 Thread GitBox
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize 
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241639326
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/ContextForTimer.java
 ##
 @@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.nemo.common.Pair;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * This class contains necessary data for timers.
+ * @param  key type
+ */
+final class ContextForTimer {
+  // Pending input watermark timers of all keys, in timestamp order.
+  private final NavigableSet> 
watermarkTimers;
+
+  // Pending processing time timers of all keys, in timestamp order.
+  private final NavigableSet> 
processingTimers;
+
+  // Pending synchronized processing time timers of all keys, in timestamp 
order.
+  private final NavigableSet> 
synchronizedProcessingTimers;
+
+  // Current input watermark.
+  private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  // Current processing time.
+  private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  // Current synchronized processing time.
+  private Instant synchronizedProcessingTime = 
BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  // map that holds timer internals of each key
+  private final Map timerInternalsMap;
+
+  /**
+   * This comparator first compares the timer data, in timestamp order.
+   * If two timer data have the same timestamp, we order them by key.
+   * As the key is not comparable, we convert it to string and compare the 
string value.
+   * In fact, the key ordering is not important, because the first ordering is 
determined by the timestamp.
+   * We only have to check whether the two keys are the same or not.
+   */
+  private final Comparator> comparator = 
(o1, o2) -> {
+final int comp = o1.right().compareTo(o2.right());
+if (comp == 0) {
+  // if two timer are the same, compare key
+  if (o1.left() == null && o2.left() == null) {
+return 0;
+  } else if (o1.left() == null || o2.left() == null) {
+return -1;
+  } else if (o1.left().equals(o2.left())) {
+return 0;
+  } else {
+return o1.left().toString().compareTo(o2.left().toString());
 
 Review comment:
   This can return 0 for two different objects, violating the contract of 
`equals`.
   Would it make sense to return a static number (e.g., 1) here since the 
ordering of the keys doesn't matter?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform

2018-12-13 Thread GitBox
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize 
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241645005
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -212,65 +213,59 @@ public void onWatermark(final Watermark inputWatermark) {
   @Override
   protected void beforeClose() {
 // Finish any pending windows by advancing the input watermark to infinity.
-processElementsAndTriggerTimers(new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
-  BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+inputWatermark = new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+processElementsAndTriggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
 
   /**
-   * Trigger times for current key.
+   * Trigger times if next timers exist.
* When triggering, it emits the windowed data to downstream operators.
-   * @param key key
-   * @param watermark watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
-  private void triggerTimers(final K key,
- final Watermark watermark,
- final Instant processingTime,
- final Instant synchronizedTime) {
-final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
-  inMemoryTimerInternalsFactory.timerInternalsForKey(key);
-try {
-  timerInternals.advanceInputWatermark(new 
Instant(watermark.getTimestamp()));
-  timerInternals.advanceProcessingTime(processingTime);
-  timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
-} catch (final Exception e) {
-  throw new RuntimeException(e);
-}
+  private int triggerTimers(final Instant processingTime,
+final Instant synchronizedTime) {
+final ContextForTimer context = inMemoryTimerInternalsFactory.context;
+context.setCurrentInputWatermarkTime(new 
Instant(inputWatermark.getTimestamp()));
 
 Review comment:
   Maybe this lets us do without the `inputWatermark` field variable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform

2018-12-13 Thread GitBox
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize 
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241645416
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -212,65 +213,59 @@ public void onWatermark(final Watermark inputWatermark) {
   @Override
   protected void beforeClose() {
 // Finish any pending windows by advancing the input watermark to infinity.
-processElementsAndTriggerTimers(new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
-  BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+inputWatermark = new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+processElementsAndTriggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE, 
BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
 
   /**
-   * Trigger times for current key.
+   * Trigger times if next timers exist.
* When triggering, it emits the windowed data to downstream operators.
-   * @param key key
-   * @param watermark watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
-  private void triggerTimers(final K key,
- final Watermark watermark,
- final Instant processingTime,
- final Instant synchronizedTime) {
-final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
-  inMemoryTimerInternalsFactory.timerInternalsForKey(key);
-try {
-  timerInternals.advanceInputWatermark(new 
Instant(watermark.getTimestamp()));
-  timerInternals.advanceProcessingTime(processingTime);
-  timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
-} catch (final Exception e) {
-  throw new RuntimeException(e);
-}
+  private int triggerTimers(final Instant processingTime,
+final Instant synchronizedTime) {
+final ContextForTimer context = inMemoryTimerInternalsFactory.context;
+context.setCurrentInputWatermarkTime(new 
Instant(inputWatermark.getTimestamp()));
+context.setCurrentProcessingTime(processingTime);
+context.setCurrentSynchronizedProcessingTime(synchronizedTime);
 
-final List timerDataList = 
getEligibleTimers(timerInternals);
+// get next eligible timers
+final List> timers = getEligibleTimers();
 
-if (!timerDataList.isEmpty()) {
+for (final Pair timer : timers) {
   // Trigger timers and emit windowed data
   final KeyedWorkItem timerWorkItem =
-KeyedWorkItems.timersWorkItem(key, timerDataList);
+KeyedWorkItems.timersWorkItem(timer.left(), 
Collections.singletonList(timer.right()));
 
 Review comment:
   Minor question: Would it be worthwhile to group timers by keys first and 
then invoke `processElement` to minimize the number of invocations?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform

2018-12-13 Thread GitBox
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize 
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241644760
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -52,15 +53,11 @@
   private transient InMemoryStateInternalsFactory 
inMemoryStateInternalsFactory;
   private Watermark prevOutputWatermark;
   private final Map keyAndWatermarkHoldMap;
+  private final WindowingStrategy windowingStrategy;
 
 Review comment:
   unused variable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform

2018-12-13 Thread GitBox
johnyangk commented on a change in pull request #178: [NEMO-317] Optimize 
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241644905
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -52,15 +53,11 @@
   private transient InMemoryStateInternalsFactory 
inMemoryStateInternalsFactory;
   private Watermark prevOutputWatermark;
   private final Map keyAndWatermarkHoldMap;
+  private final WindowingStrategy windowingStrategy;
+  private Watermark inputWatermark;
 
 Review comment:
   Remove this variable and reuse 
`inMemoryTimerInternalsFactory.context#currentInputWatermarkTime`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho closed pull request #179: [NEMO-318] Make nemo.common.Cloneable extend java.lang.Cloneable

2018-12-12 Thread GitBox
seojangho closed pull request #179: [NEMO-318] Make nemo.common.Cloneable 
extend java.lang.Cloneable
URL: https://github.com/apache/incubator-nemo/pull/179
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/org/apache/nemo/common/Cloneable.java 
b/common/src/main/java/org/apache/nemo/common/Cloneable.java
index f1c3eab41..5c32bfde8 100644
--- a/common/src/main/java/org/apache/nemo/common/Cloneable.java
+++ b/common/src/main/java/org/apache/nemo/common/Cloneable.java
@@ -29,7 +29,7 @@
  *
  * @param  the type of objects that this class can clone
  */
-public interface Cloneable> {
+public interface Cloneable {
 
   /**
* Creates and returns a copy of this object.


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on issue #179: [NEMO-318] Make nemo.common.Cloneable extend java.lang.Cloneable

2018-12-12 Thread GitBox
seojangho commented on issue #179: [NEMO-318] Make nemo.common.Cloneable extend 
java.lang.Cloneable
URL: https://github.com/apache/incubator-nemo/pull/179#issuecomment-446864939
 
 
   Oops. This PR was pointless. Sorry.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on issue #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo

2018-12-12 Thread GitBox
seojangho commented on issue #95: [NEMO-25] Improve WebUI to use RESTful APIs 
by Nemo
URL: https://github.com/apache/incubator-nemo/pull/95#issuecomment-446861615
 
 
   @johnyangk @wonook Please check the updates. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho opened a new pull request #179: [NEMO-318] Make nemo.common.Cloneable extend java.lang.Cloneable

2018-12-12 Thread GitBox
seojangho opened a new pull request #179: [NEMO-318] Make nemo.common.Cloneable 
extend java.lang.Cloneable
URL: https://github.com/apache/incubator-nemo/pull/179
 
 
   JIRA: [NEMO-318: Make org.apache.nemo.common.Cloneable inherit 
java.lang.Cloneable](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-318)
   
   **Minor changes to note:**
   - Made org.apache.nemo.common.Cloneable inherit java.lang.Cloneable, not 
itself


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum opened a new pull request #178: [NEMO-317] Optimize triggering logic in GroupByKeyAndWindowDoFnTransform

2018-12-12 Thread GitBox
taegeonum opened a new pull request #178: [NEMO-317] Optimize triggering logic 
in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178
 
 
   JIRA: [NEMO-317: Optimize triggering logic in 
GroupByKeyAndWindowDoFnTransform](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-317)
   
   **Major changes:**
   - Create `NemoTimerInternals` that adds timer data to the shared data 
structures of `ContextForTimer`. 
   - Remove timers from `ContextForTimer` and trigger them, instead of 
iterating all keys. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jeongyooneo closed pull request #166: [NEMO-80] SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder

2018-12-10 Thread GitBox
jeongyooneo closed pull request #166: [NEMO-80] SLF4J: Failed to load class 
org.slf4j.impl.StaticLoggerBinder
URL: https://github.com/apache/incubator-nemo/pull/166
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index b77c67f9e..ce18b0075 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,6 +140,16 @@ under the License.
 slf4j-log4j12
 ${slf4j.version}
 
+
+
+org.slf4j
+slf4j-simple
+1.6.2
+test
+
 
 
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum closed pull request #165: [NEMO-221] Confusing error messages for a failure at the client-side …

2018-12-10 Thread GitBox
taegeonum closed pull request #165: [NEMO-221] Confusing error messages for a 
failure at the client-side …
URL: https://github.com/apache/incubator-nemo/pull/165
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/master/src/main/java/org/apache/nemo/runtime/master/ClientRPC.java 
b/runtime/master/src/main/java/org/apache/nemo/runtime/master/ClientRPC.java
index f17f284ae..213f9a1b9 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/ClientRPC.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/ClientRPC.java
@@ -58,11 +58,16 @@
   private ClientRPC(final TransportFactory transportFactory,
 final LocalAddressProvider localAddressProvider,
 @Parameter(JobConf.ClientSideRPCServerHost.class) final 
String clientHost,
-@Parameter(JobConf.ClientSideRPCServerPort.class) final 
int clientPort) throws IOException {
+@Parameter(JobConf.ClientSideRPCServerPort.class) final 
int clientPort) {
 transport = 
transportFactory.newInstance(localAddressProvider.getLocalAddress(),
 0, new SyncStage<>(new RPCEventHandler()), null, RETRY_COUNT, 
RETRY_TIMEOUT);
 final SocketAddress clientAddress = new InetSocketAddress(clientHost, 
clientPort);
-link = transport.open(clientAddress, ENCODER, LINK_LISTENER);
+try {
+  link = transport.open(clientAddress, ENCODER, LINK_LISTENER);
+} catch (final IOException e) {
+  throw new IllegalStateException("Failed to setup an RPC connection to 
the Client. "
++ "A failure at the client-side is suspected.");
+}
   }
 
   /**


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on issue #165: [NEMO-221] Confusing error messages for a failure at the client-side …

2018-12-10 Thread GitBox
taegeonum commented on issue #165: [NEMO-221] Confusing error messages for a 
failure at the client-side …
URL: https://github.com/apache/incubator-nemo/pull/165#issuecomment-446023531
 
 
   LGTM. I'm merging it. Thanks @yunseong !


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho closed pull request #175: [NEMO-310] Exclude beam/spark examples jars from deployment

2018-12-09 Thread GitBox
seojangho closed pull request #175: [NEMO-310] Exclude beam/spark examples jars 
from deployment
URL: https://github.com/apache/incubator-nemo/pull/175
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk closed pull request #177: [NEMO-316] CombinePartial/FinalTransform is not disabled in streaming mode

2018-12-09 Thread GitBox
johnyangk closed pull request #177: [NEMO-316] CombinePartial/FinalTransform is 
not disabled in streaming mode
URL: https://github.com/apache/incubator-nemo/pull/177
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 3e000d37f..7c6335751 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -341,7 +341,7 @@ private static void flattenTranslator(final 
PipelineTranslationContext ctx,
 
 // Check if the partial combining optimization can be applied.
 // If not, simply use the default Combine implementation by entering into 
it.
-if (!isGlobalWindow(beamNode, ctx.getPipeline())) {
+if (!(isMainInputBounded(beamNode, ctx.getPipeline()) && 
isGlobalWindow(beamNode, ctx.getPipeline( {
   // TODO #263: Partial Combining for Beam Streaming
   return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
 }
@@ -500,6 +500,7 @@ private static Transform createGBKTransform(
 }
   }
 
+
   /**
* @param beamNode the beam node to be translated.
* @param pipeline pipeline.
@@ -511,4 +512,16 @@ private static boolean isGlobalWindow(final 
TransformHierarchy.Node beamNode, fi
   
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
 return mainInput.getWindowingStrategy().getWindowFn() instanceof 
GlobalWindows;
   }
+
+  /**
+   * @param beamNode the beam node to be translated.
+   * @param pipeline pipeline.
+   * @return true if the main input bounded.
+   */
+  private static boolean isMainInputBounded(final TransformHierarchy.Node 
beamNode, final Pipeline pipeline) {
+final AppliedPTransform pTransform = 
beamNode.toAppliedPTransform(pipeline);
+final PCollection mainInput = (PCollection)
+  
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+return mainInput.isBounded() == PCollection.IsBounded.BOUNDED;
+  }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum opened a new pull request #177: [NEMO-316] CombinePartial/FinalTransform is not disabled in streaming mode

2018-12-09 Thread GitBox
taegeonum opened a new pull request #177: [NEMO-316] 
CombinePartial/FinalTransform is not disabled in streaming mode
URL: https://github.com/apache/incubator-nemo/pull/177
 
 
   JIRA: [NEMO-316: CombinePartial/FinalTransform is not disabled in streaming 
mode](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-316)
   
   **Major changes:**
   - Check whether the main input is bounded or not. 
   
   The solution is brought from @johnyangk's nexmark branch. 
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum closed pull request #176: [NEMO-315] Remove checkstyle settings for javadoc error

2018-12-09 Thread GitBox
taegeonum closed pull request #176: [NEMO-315] Remove checkstyle settings for 
javadoc error
URL: https://github.com/apache/incubator-nemo/pull/176
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle.xml b/checkstyle.xml
index e3d0b2869..c7f876114 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -77,10 +77,6 @@ under the License.
 
 
 
-
-
-
-
 
 
 
@@ -88,7 +84,7 @@ under the License.
 
 
 
-
+
 
 
 
diff --git a/suppressions.xml b/suppressions.xml
deleted file mode 100644
index f0f4d8cbd..0
--- a/suppressions.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-
-
-
-https://checkstyle.org/dtds/suppressions_1_2.dtd;>
-
-
-
-


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on issue #176: [NEMO-315] Remove checkstyle settings for javadoc error

2018-12-09 Thread GitBox
taegeonum commented on issue #176: [NEMO-315] Remove checkstyle settings for 
javadoc error
URL: https://github.com/apache/incubator-nemo/pull/176#issuecomment-445660930
 
 
   Build success in my local mac OSX! I will merge it. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jeongyooneo opened a new pull request #176: [NEMO-315] Remove checkstyle settings for javadoc error

2018-12-09 Thread GitBox
jeongyooneo opened a new pull request #176: [NEMO-315] Remove checkstyle 
settings for javadoc error
URL: https://github.com/apache/incubator-nemo/pull/176
 
 
   JIRA: [NEMO-315: Remove javadoc checkstyle 
suppression](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-315)
   
   **Major changes:**
   - N/A
   
   **Minor changes to note:**
   - Checkstyle settings for javadoc error suppression for those modules are 
removed due to reported build failures.
   
   **Tests for the changes:**
   - N/A
   
   **Other comments:**
   - N/A


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jooykim opened a new pull request #175: [NEMO-310] Exclude beam/spark examples jars from deployment

2018-12-09 Thread GitBox
jooykim opened a new pull request #175: [NEMO-310] Exclude beam/spark examples 
jars from deployment
URL: https://github.com/apache/incubator-nemo/pull/175
 
 
   JIRA: [NEMO-310: Exclude 'examples' JAR from release 
artifacts](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-310)
   
   **Major changes:**
   - Modifies examples/beam/pom.xml and examples/spark/pom.xml to exclude them 
from deployment to Nexus.
   
   **Minor changes to note:**
   - N/A 
   
   **Tests for the changes:**
   - I've checked that it works using "mvn deploy" to Apache's Nexus repository.
   
   **Other comments:**
   - N/A
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jeongyooneo closed pull request #169: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings

2018-12-05 Thread GitBox
jeongyooneo closed pull request #169: [NEMO-10] Handle Method Javadocs 
Requirements for Checkstyle Warnings
URL: https://github.com/apache/incubator-nemo/pull/169
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
index 4d1c9dafe..e912125f7 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
@@ -39,8 +39,8 @@
 
   /**
* Default constructor.
-   * @param aggregatedDynOptData initial dynamic optimization data.
-   * @param dynOptDataAggregator aggregator for the dynamic optimization data.
+   * @param aggregatedDynOptData per-stage aggregated dynamic optimization 
data.
+   * @param dynOptDataAggregator aggregator to use.
*/
   public AggregateMetricTransform(final O aggregatedDynOptData,
   final BiFunction 
dynOptDataAggregator) {
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
index 05254bc76..39dc6e036 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
@@ -41,8 +41,8 @@
 
   /**
* MetricCollectTransform constructor.
-   * @param dynOptData dynamic optimization data.
-   * @param dynOptDataCollector for the data.
+   * @param dynOptData per-task dynamic optimization data.
+   * @param dynOptDataCollector that collects the data.
* @param closer callback function to be invoked when closing the transform.
*/
   public MetricCollectTransform(final O dynOptData,
diff --git a/suppressions.xml b/suppressions.xml
index c1e89929c..f0f4d8cbd 100644
--- a/suppressions.xml
+++ b/suppressions.xml
@@ -24,5 +24,5 @@ under the License.
 
 
 
+  
files="[\\/]runtime[\\/]|[\\/]compiler[\\/]|[\\/]client[\\/]|[\\/]examples[\\/]"/>
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jeongyooneo closed pull request #169: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings

2018-12-05 Thread GitBox
jeongyooneo closed pull request #169: [NEMO-10] Handle Method Javadocs 
Requirements for Checkstyle Warnings
URL: https://github.com/apache/incubator-nemo/pull/169
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
index 4d1c9dafe..e912125f7 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
@@ -39,8 +39,8 @@
 
   /**
* Default constructor.
-   * @param aggregatedDynOptData initial dynamic optimization data.
-   * @param dynOptDataAggregator aggregator for the dynamic optimization data.
+   * @param aggregatedDynOptData per-stage aggregated dynamic optimization 
data.
+   * @param dynOptDataAggregator aggregator to use.
*/
   public AggregateMetricTransform(final O aggregatedDynOptData,
   final BiFunction 
dynOptDataAggregator) {
diff --git 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
index 05254bc76..39dc6e036 100644
--- 
a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
+++ 
b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/MetricCollectTransform.java
@@ -41,8 +41,8 @@
 
   /**
* MetricCollectTransform constructor.
-   * @param dynOptData dynamic optimization data.
-   * @param dynOptDataCollector for the data.
+   * @param dynOptData per-task dynamic optimization data.
+   * @param dynOptDataCollector that collects the data.
* @param closer callback function to be invoked when closing the transform.
*/
   public MetricCollectTransform(final O dynOptData,
diff --git a/suppressions.xml b/suppressions.xml
index c1e89929c..f0f4d8cbd 100644
--- a/suppressions.xml
+++ b/suppressions.xml
@@ -24,5 +24,5 @@ under the License.
 
 
 
+  
files="[\\/]runtime[\\/]|[\\/]compiler[\\/]|[\\/]client[\\/]|[\\/]examples[\\/]"/>
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jeongyooneo closed pull request #170: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings

2018-12-05 Thread GitBox
jeongyooneo closed pull request #170: [NEMO-10] Handle Method Javadocs 
Requirements for Checkstyle Warnings
URL: https://github.com/apache/incubator-nemo/pull/170
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle.xml b/checkstyle.xml
index c7f876114..e3d0b2869 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -77,6 +77,10 @@ under the License.
 
 
 
+
+
+
+
 
 
 
@@ -84,7 +88,7 @@ under the License.
 
 
 
-
+
 
 
 
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java 
b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index f95bc73fc..e849acae3 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -438,6 +438,7 @@ private static Configuration getDeployModeConf(final 
Configuration jobConf) thro
* @param jobConf   job configuration to get json path.
* @param pathParameter named parameter represents path to the json 
file, or an empty string
* @param contentsParameter named parameter represents contents of the file
+   * @param defaultContentthe default configuration
* @return configuration with contents of the file, or an empty string as 
value for {@code contentsParameter}
* @throws InjectionException exception while injection.
*/
diff --git 
a/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java 
b/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java
index 0de21378e..8d7f166d1 100644
--- a/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java
@@ -28,11 +28,15 @@
 
   /**
* Default constructor.
+   * @param metric metric.
*/
   public DataSkewMetricFactory(final Map metric) {
 this.metric = metric;
   }
 
+  /**
+   * @return the metric.
+   */
   public Map getMetric() {
 return metric;
   }
diff --git a/common/src/main/java/org/apache/nemo/common/HashRange.java 
b/common/src/main/java/org/apache/nemo/common/HashRange.java
index 1ca00078b..6a5f195cc 100644
--- a/common/src/main/java/org/apache/nemo/common/HashRange.java
+++ b/common/src/main/java/org/apache/nemo/common/HashRange.java
@@ -33,6 +33,7 @@
* Private constructor.
* @param rangeBeginInclusive point at which the hash range starts 
(inclusive).
* @param rangeEndExclusive point at which the hash range ends (exclusive).
+   * @param isSkewed whether or not the range is skewed
*/
   private HashRange(final int rangeBeginInclusive, final int 
rangeEndExclusive, final boolean isSkewed) {
 if (rangeBeginInclusive < 0 || rangeEndExclusive < 0) {
@@ -53,6 +54,7 @@ public static HashRange all() {
   /**
* @param rangeStartInclusive the start of the range (inclusive)
* @param rangeEndExclusive   the end of the range (exclusive)
+   * @param isSkewed  whether or not the range is skewed
* @return A hash range descriptor representing [{@code 
rangeBeginInclusive}, {@code rangeEndExclusive})
*/
   public static HashRange of(final int rangeStartInclusive, final int 
rangeEndExclusive, final boolean isSkewed) {
@@ -119,6 +121,9 @@ public boolean equals(final Object o) {
 return true;
   }
 
+  /**
+   * @return the hash value.
+   */
   @Override
   public int hashCode() {
 return Arrays.hashCode(new Object[] {
@@ -128,6 +133,9 @@ public int hashCode() {
 });
   }
 
+  /**
+   * @return whether or not the range is skewed.
+   */
   public boolean isSkewed() {
 return isSkewed;
   }
diff --git a/common/src/main/java/org/apache/nemo/common/StateMachine.java 
b/common/src/main/java/org/apache/nemo/common/StateMachine.java
index 24180e23e..28740a95f 100644
--- a/common/src/main/java/org/apache/nemo/common/StateMachine.java
+++ b/common/src/main/java/org/apache/nemo/common/StateMachine.java
@@ -61,7 +61,7 @@ public synchronized void checkState(final Enum 
expectedCurrentState) {
* Sets the current state as a certain state.
*
* @param state a state
-   * @throws RuntimeException if the state is unknown state, or the transition
+   * @throws IllegalStateTransitionException the state is unknown state, or 
the transition
* from the current state to the specified state is illegal
*/
   public synchronized void setState(final Enum state) throws 
IllegalStateTransitionException {
@@ -86,7 +86,7 @@ public synchronized void setState(final Enum state) throws 
IllegalStateTransitio
* @param state a state
* @return {@code true} if successful. {@code false} indicates that
* the actual value was not equal to the expected value.
-   * 

[GitHub] jeongyooneo closed pull request #170: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings

2018-12-05 Thread GitBox
jeongyooneo closed pull request #170: [NEMO-10] Handle Method Javadocs 
Requirements for Checkstyle Warnings
URL: https://github.com/apache/incubator-nemo/pull/170
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle.xml b/checkstyle.xml
index c7f876114..e3d0b2869 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -77,6 +77,10 @@ under the License.
 
 
 
+
+
+
+
 
 
 
@@ -84,7 +88,7 @@ under the License.
 
 
 
-
+
 
 
 
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java 
b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index f95bc73fc..e849acae3 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -438,6 +438,7 @@ private static Configuration getDeployModeConf(final 
Configuration jobConf) thro
* @param jobConf   job configuration to get json path.
* @param pathParameter named parameter represents path to the json 
file, or an empty string
* @param contentsParameter named parameter represents contents of the file
+   * @param defaultContentthe default configuration
* @return configuration with contents of the file, or an empty string as 
value for {@code contentsParameter}
* @throws InjectionException exception while injection.
*/
diff --git 
a/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java 
b/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java
index 0de21378e..8d7f166d1 100644
--- a/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/DataSkewMetricFactory.java
@@ -28,11 +28,15 @@
 
   /**
* Default constructor.
+   * @param metric metric.
*/
   public DataSkewMetricFactory(final Map metric) {
 this.metric = metric;
   }
 
+  /**
+   * @return the metric.
+   */
   public Map getMetric() {
 return metric;
   }
diff --git a/common/src/main/java/org/apache/nemo/common/HashRange.java 
b/common/src/main/java/org/apache/nemo/common/HashRange.java
index 1ca00078b..6a5f195cc 100644
--- a/common/src/main/java/org/apache/nemo/common/HashRange.java
+++ b/common/src/main/java/org/apache/nemo/common/HashRange.java
@@ -33,6 +33,7 @@
* Private constructor.
* @param rangeBeginInclusive point at which the hash range starts 
(inclusive).
* @param rangeEndExclusive point at which the hash range ends (exclusive).
+   * @param isSkewed whether or not the range is skewed
*/
   private HashRange(final int rangeBeginInclusive, final int 
rangeEndExclusive, final boolean isSkewed) {
 if (rangeBeginInclusive < 0 || rangeEndExclusive < 0) {
@@ -53,6 +54,7 @@ public static HashRange all() {
   /**
* @param rangeStartInclusive the start of the range (inclusive)
* @param rangeEndExclusive   the end of the range (exclusive)
+   * @param isSkewed  whether or not the range is skewed
* @return A hash range descriptor representing [{@code 
rangeBeginInclusive}, {@code rangeEndExclusive})
*/
   public static HashRange of(final int rangeStartInclusive, final int 
rangeEndExclusive, final boolean isSkewed) {
@@ -119,6 +121,9 @@ public boolean equals(final Object o) {
 return true;
   }
 
+  /**
+   * @return the hash value.
+   */
   @Override
   public int hashCode() {
 return Arrays.hashCode(new Object[] {
@@ -128,6 +133,9 @@ public int hashCode() {
 });
   }
 
+  /**
+   * @return whether or not the range is skewed.
+   */
   public boolean isSkewed() {
 return isSkewed;
   }
diff --git a/common/src/main/java/org/apache/nemo/common/StateMachine.java 
b/common/src/main/java/org/apache/nemo/common/StateMachine.java
index 24180e23e..28740a95f 100644
--- a/common/src/main/java/org/apache/nemo/common/StateMachine.java
+++ b/common/src/main/java/org/apache/nemo/common/StateMachine.java
@@ -61,7 +61,7 @@ public synchronized void checkState(final Enum 
expectedCurrentState) {
* Sets the current state as a certain state.
*
* @param state a state
-   * @throws RuntimeException if the state is unknown state, or the transition
+   * @throws IllegalStateTransitionException the state is unknown state, or 
the transition
* from the current state to the specified state is illegal
*/
   public synchronized void setState(final Enum state) throws 
IllegalStateTransitionException {
@@ -86,7 +86,7 @@ public synchronized void setState(final Enum state) throws 
IllegalStateTransitio
* @param state a state
* @return {@code true} if successful. {@code false} indicates that
* the actual value was not equal to the expected value.
-   * 

[GitHub] johnyangk closed pull request #174: [NEMO-311] Add StreamingPolicy

2018-12-05 Thread GitBox
johnyangk closed pull request #174: [NEMO-311] Add StreamingPolicy
URL: https://github.com/apache/incubator-nemo/pull/174
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/StreamingPolicy.java
 
b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/StreamingPolicy.java
new file mode 100644
index 0..9bdf2d4ed
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/StreamingPolicy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.policy;
+
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.eventhandler.PubSubEventHandlerWrapper;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import 
org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.PipeTransferForAllEdgesPass;
+import 
org.apache.nemo.compiler.optimizer.pass.compiletime.composite.DefaultCompositePass;
+import org.apache.reef.tang.Injector;
+
+/**
+ * Streaming policy.
+ */
+public final class StreamingPolicy implements Policy {
+  private final Policy policy;
+
+  /**
+   * Default constructor.
+   */
+  public StreamingPolicy() {
+final PolicyBuilder builder = new PolicyBuilder();
+builder.registerCompileTimePass(new DefaultCompositePass());
+builder.registerCompileTimePass(new PipeTransferForAllEdgesPass());
+this.policy = builder.build();
+  }
+
+  @Override
+  public DAG runCompileTimeOptimization(final DAG dag, final String dagDirectory) {
+return this.policy.runCompileTimeOptimization(dag, dagDirectory);
+  }
+
+  @Override
+  public void registerRunTimeOptimizations(final Injector injector, final 
PubSubEventHandlerWrapper pubSubWrapper) {
+this.policy.registerRunTimeOptimizations(injector, pubSubWrapper);
+  }
+}


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk closed pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

2018-12-03 Thread GitBox
johnyangk closed pull request #172: [NEMO-270] Test different triggers in 
GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 06dde5861..a4315082a 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -24,6 +24,7 @@
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -240,15 +241,6 @@ private void triggerTimers(final K key,
   // The DoFnRunner interface requires WindowedValue,
   // but this windowed value is actually not used in the ReduceFnRunner 
internal.
   
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
-
-  // output watermark
-  // we set output watermark to the minimum of the timer data
-  long keyOutputTimestamp = Long.MAX_VALUE;
-  for (final TimerInternals.TimerData timer : timerDataList) {
-keyOutputTimestamp = Math.min(keyOutputTimestamp, 
timer.getTimestamp().getMillis());
-  }
-
-  timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));
 }
   }
 
@@ -349,14 +341,21 @@ public TimerInternals timerInternalsForKey(final K key) {
 
 @Override
 public void emit(final WindowedValue>> output) {
-  // adds the output timestamp to the watermark hold of each key
-  // +1 to the output timestamp because if the window is [0-5000), the 
timestamp is 4999
-  // TODO #270: consider early firing
-  // TODO #270: This logic may not be applied to early firing outputs
-  keyAndWatermarkHoldMap.put(output.getValue().getKey(),
-new Watermark(output.getTimestamp().getMillis() + 1));
+
+  // The watermark advances only in ON_TIME
+  if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
+final K key = output.getValue().getKey();
+final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
+  inMemoryTimerInternalsFactory.timerInternalsForKey(key);
+keyAndWatermarkHoldMap.put(key,
+  // adds the output timestamp to the watermark hold of each key
+  // +1 to the output timestamp because if the window is [0-5000), the 
timestamp is 4999
+  new Watermark(output.getTimestamp().getMillis() + 1));
+timerInternals.advanceOutputWatermark(new 
Instant(output.getTimestamp().getMillis() + 1));
+  }
   outputCollector.emit(output);
 }
+
 @Override
 public void emitWatermark(final Watermark watermark) {
   outputCollector.emitWatermark(watermark);
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index 474c79c0d..f0749c0c0 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -33,16 +33,20 @@
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
-import static java.util.Collections.emptyList;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.EARLY;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.LATE;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME;
+import static 
org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
-// TODO #270: Test different triggers
 public final class GroupByKeyAndWindowDoFnTransformTest {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransformTest.class.getName());
   private 

[GitHub] taegeonum commented on issue #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

2018-12-03 Thread GitBox
taegeonum commented on issue #172: [NEMO-270] Test different triggers in 
GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172#issuecomment-443989001
 
 
   @johnyangk I've addressed your comments. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

2018-12-03 Thread GitBox
taegeonum commented on a change in pull request #172: [NEMO-270] Test different 
triggers in GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238541422
 
 

 ##
 File path: 
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 ##
 @@ -248,4 +252,120 @@ public void test() {
 
 doFnTransform.close();
   }
+
+  /**
+   * Test complex triggers that emit early and late firing.
+   */
+  @Test
+  public void eventTimeTriggerTest() {
+final Duration lateness = Duration.standardSeconds(1);
+final AfterWatermark.AfterWatermarkEarlyAndLate trigger = 
AfterWatermark.pastEndOfWindow()
+  // early firing
+  .withEarlyFirings(
+AfterProcessingTime
+  .pastFirstElementInPane()
+  // early firing 1 sec after receiving an element
+  .plusDelayOf(Duration.millis(1000)))
+  // late firing: Fire on any late data.
+  .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+final FixedWindows window = (FixedWindows) Window.into(
+  FixedWindows.of(Duration.standardSeconds(5)))
+  // lateness
+  .withAllowedLateness(lateness)
+  .triggering(trigger)
+  .accumulatingFiredPanes().getWindowFn();
+
+final TupleTag outputTag = new TupleTag<>("main-output");
+final GroupByKeyAndWindowDoFnTransform doFnTransform =
+  new GroupByKeyAndWindowDoFnTransform(
+NULL_OUTPUT_CODERS,
+outputTag,
+WindowingStrategy.of(window).withTrigger(trigger)
+  .withMode(ACCUMULATING_FIRED_PANES)
 
 Review comment:
   I will add todo (but I've tested it without accumulating and it works well)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho closed pull request #173: [NEMO-307] Enable integration test

2018-12-03 Thread GitBox
seojangho closed pull request #173: [NEMO-307] Enable integration test
URL: https://github.com/apache/incubator-nemo/pull/173
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 768ade95f..b77c67f9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -163,19 +163,6 @@ under the License.
 maven-surefire-plugin
 ${surefire.version}
 
-
-org.apache.maven.plugins
-maven-failsafe-plugin
-${surefire.version}
-
-
-
-integration-test
-verify
-
-
-
-
 
 
 
@@ -203,6 +190,19 @@ under the License.
 
 
 
+
+  org.apache.maven.plugins
+  maven-failsafe-plugin
+  ${surefire.version}
+  
+
+  
+integration-test
+verify
+  
+
+  
+
 
 org.codehaus.mojo
 build-helper-maven-plugin


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

2018-12-03 Thread GitBox
taegeonum commented on a change in pull request #172: [NEMO-270] Test different 
triggers in GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238541138
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -351,12 +343,17 @@ public TimerInternals timerInternalsForKey(final K key) {
 public void emit(final WindowedValue>> output) {
   // adds the output timestamp to the watermark hold of each key
   // +1 to the output timestamp because if the window is [0-5000), the 
timestamp is 4999
-  // TODO #270: consider early firing
-  // TODO #270: This logic may not be applied to early firing outputs
-  keyAndWatermarkHoldMap.put(output.getValue().getKey(),
-new Watermark(output.getTimestamp().getMillis() + 1));
+  if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
 
 Review comment:
   EARLY does not guarantee watermark advance, so we should exclude early 
results. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

2018-12-03 Thread GitBox
johnyangk commented on a change in pull request #172: [NEMO-270] Test different 
triggers in GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238537290
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 ##
 @@ -351,12 +343,17 @@ public TimerInternals timerInternalsForKey(final K key) {
 public void emit(final WindowedValue>> output) {
   // adds the output timestamp to the watermark hold of each key
   // +1 to the output timestamp because if the window is [0-5000), the 
timestamp is 4999
-  // TODO #270: consider early firing
-  // TODO #270: This logic may not be applied to early firing outputs
-  keyAndWatermarkHoldMap.put(output.getValue().getKey(),
-new Watermark(output.getTimestamp().getMillis() + 1));
+  if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
 
 Review comment:
   Can you add a brief comment on why we advance the output watermark for only 
ON_TIME outputs?
   Excluding LATE seems obvious (we don't want to advance 'backwards'), but 
what about EARLY?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

2018-12-03 Thread GitBox
johnyangk commented on a change in pull request #172: [NEMO-270] Test different 
triggers in GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238537837
 
 

 ##
 File path: 
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 ##
 @@ -248,4 +252,120 @@ public void test() {
 
 doFnTransform.close();
   }
+
+  /**
+   * Test complex triggers that emit early and late firing.
+   */
+  @Test
+  public void eventTimeTriggerTest() {
+final Duration lateness = Duration.standardSeconds(1);
+final AfterWatermark.AfterWatermarkEarlyAndLate trigger = 
AfterWatermark.pastEndOfWindow()
+  // early firing
+  .withEarlyFirings(
+AfterProcessingTime
+  .pastFirstElementInPane()
+  // early firing 1 sec after receiving an element
+  .plusDelayOf(Duration.millis(1000)))
+  // late firing: Fire on any late data.
+  .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+final FixedWindows window = (FixedWindows) Window.into(
+  FixedWindows.of(Duration.standardSeconds(5)))
+  // lateness
+  .withAllowedLateness(lateness)
+  .triggering(trigger)
+  .accumulatingFiredPanes().getWindowFn();
+
+final TupleTag outputTag = new TupleTag<>("main-output");
+final GroupByKeyAndWindowDoFnTransform doFnTransform =
+  new GroupByKeyAndWindowDoFnTransform(
+NULL_OUTPUT_CODERS,
+outputTag,
+WindowingStrategy.of(window).withTrigger(trigger)
+  .withMode(ACCUMULATING_FIRED_PANES)
+.withAllowedLateness(lateness),
+PipelineOptionsFactory.as(NemoPipelineOptions.class),
+SystemReduceFn.buffering(NULL_INPUT_CODER),
+DisplayData.none());
+
+
+final Transform.Context context = mock(Transform.Context.class);
+final TestOutputCollector>> oc = new 
TestOutputCollector();
+doFnTransform.prepare(context, oc);
+
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "hello"), new Instant(1), window.assignWindow(new 
Instant(1)), PaneInfo.NO_FIRING));
+
+// early firing is not related to the watermark progress
+doFnTransform.onWatermark(new Watermark(2));
+assertEquals(1, oc.outputs.size());
+assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+LOG.info("Output: {}", oc.outputs.get(0));
+oc.outputs.clear();
+
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "world"), new Instant(3), window.assignWindow(new 
Instant(3)), PaneInfo.NO_FIRING));
+// EARLY firing... waiting >= 1 sec
+try {
+  Thread.sleep(2000);
+} catch (InterruptedException e) {
+  e.printStackTrace();
+}
+
+doFnTransform.onWatermark(new Watermark(5));
 
 Review comment:
   I suppose this effectively advances processing time?
   Can you add a comment about that?
   Maybe also check that `oc.outputs` is empty prior to this line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

2018-12-03 Thread GitBox
johnyangk commented on a change in pull request #172: [NEMO-270] Test different 
triggers in GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238539403
 
 

 ##
 File path: 
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 ##
 @@ -248,4 +252,120 @@ public void test() {
 
 doFnTransform.close();
   }
+
+  /**
+   * Test complex triggers that emit early and late firing.
+   */
+  @Test
+  public void eventTimeTriggerTest() {
+final Duration lateness = Duration.standardSeconds(1);
+final AfterWatermark.AfterWatermarkEarlyAndLate trigger = 
AfterWatermark.pastEndOfWindow()
+  // early firing
+  .withEarlyFirings(
+AfterProcessingTime
+  .pastFirstElementInPane()
+  // early firing 1 sec after receiving an element
+  .plusDelayOf(Duration.millis(1000)))
+  // late firing: Fire on any late data.
+  .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+final FixedWindows window = (FixedWindows) Window.into(
+  FixedWindows.of(Duration.standardSeconds(5)))
+  // lateness
+  .withAllowedLateness(lateness)
+  .triggering(trigger)
+  .accumulatingFiredPanes().getWindowFn();
+
+final TupleTag outputTag = new TupleTag<>("main-output");
+final GroupByKeyAndWindowDoFnTransform doFnTransform =
+  new GroupByKeyAndWindowDoFnTransform(
+NULL_OUTPUT_CODERS,
+outputTag,
+WindowingStrategy.of(window).withTrigger(trigger)
+  .withMode(ACCUMULATING_FIRED_PANES)
+.withAllowedLateness(lateness),
+PipelineOptionsFactory.as(NemoPipelineOptions.class),
+SystemReduceFn.buffering(NULL_INPUT_CODER),
+DisplayData.none());
+
+
+final Transform.Context context = mock(Transform.Context.class);
+final TestOutputCollector>> oc = new 
TestOutputCollector();
+doFnTransform.prepare(context, oc);
+
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "hello"), new Instant(1), window.assignWindow(new 
Instant(1)), PaneInfo.NO_FIRING));
+
+// early firing is not related to the watermark progress
+doFnTransform.onWatermark(new Watermark(2));
+assertEquals(1, oc.outputs.size());
+assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+LOG.info("Output: {}", oc.outputs.get(0));
+oc.outputs.clear();
+
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "world"), new Instant(3), window.assignWindow(new 
Instant(3)), PaneInfo.NO_FIRING));
+// EARLY firing... waiting >= 1 sec
+try {
+  Thread.sleep(2000);
+} catch (InterruptedException e) {
+  e.printStackTrace();
+}
+
+doFnTransform.onWatermark(new Watermark(5));
+assertEquals(1, oc.outputs.size());
+assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+// ACCUMULATION MODE
+checkOutput(KV.of("1", Arrays.asList("hello", "world")), 
oc.outputs.get(0).getValue());
+LOG.info("Output: {}", oc.outputs.get(0));
+oc.outputs.clear();
+
+// ON TIME
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), 
PaneInfo.NO_FIRING));
+doFnTransform.onWatermark(new Watermark(5001));
+assertEquals(1, oc.outputs.size());
+assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
+LOG.info("Output: {}", oc.outputs.get(0));
+// ACCUMULATION MODE
+checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), 
oc.outputs.get(0).getValue());
+oc.outputs.clear();
+
+// LATE DATA
+// actual window: [0-5000)
+// allowed lateness: 1000 (ms)
+// current watermark: 5001
+// data: 4500
+// the data timestamp + allowed lateness > current watermark,
+// so it should be accumulated to the prev window
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "hello!"), new Instant(4500),
 
 Review comment:
   hello -> hello again
   
    


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

2018-12-03 Thread GitBox
johnyangk commented on a change in pull request #172: [NEMO-270] Test different 
triggers in GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238538386
 
 

 ##
 File path: 
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 ##
 @@ -248,4 +252,120 @@ public void test() {
 
 doFnTransform.close();
   }
+
+  /**
+   * Test complex triggers that emit early and late firing.
+   */
+  @Test
+  public void eventTimeTriggerTest() {
+final Duration lateness = Duration.standardSeconds(1);
+final AfterWatermark.AfterWatermarkEarlyAndLate trigger = 
AfterWatermark.pastEndOfWindow()
+  // early firing
+  .withEarlyFirings(
+AfterProcessingTime
+  .pastFirstElementInPane()
+  // early firing 1 sec after receiving an element
+  .plusDelayOf(Duration.millis(1000)))
+  // late firing: Fire on any late data.
+  .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+final FixedWindows window = (FixedWindows) Window.into(
+  FixedWindows.of(Duration.standardSeconds(5)))
+  // lateness
+  .withAllowedLateness(lateness)
+  .triggering(trigger)
+  .accumulatingFiredPanes().getWindowFn();
+
+final TupleTag outputTag = new TupleTag<>("main-output");
+final GroupByKeyAndWindowDoFnTransform doFnTransform =
+  new GroupByKeyAndWindowDoFnTransform(
+NULL_OUTPUT_CODERS,
+outputTag,
+WindowingStrategy.of(window).withTrigger(trigger)
+  .withMode(ACCUMULATING_FIRED_PANES)
+.withAllowedLateness(lateness),
+PipelineOptionsFactory.as(NemoPipelineOptions.class),
+SystemReduceFn.buffering(NULL_INPUT_CODER),
+DisplayData.none());
+
+
+final Transform.Context context = mock(Transform.Context.class);
+final TestOutputCollector>> oc = new 
TestOutputCollector();
+doFnTransform.prepare(context, oc);
+
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "hello"), new Instant(1), window.assignWindow(new 
Instant(1)), PaneInfo.NO_FIRING));
+
+// early firing is not related to the watermark progress
+doFnTransform.onWatermark(new Watermark(2));
+assertEquals(1, oc.outputs.size());
+assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+LOG.info("Output: {}", oc.outputs.get(0));
+oc.outputs.clear();
+
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "world"), new Instant(3), window.assignWindow(new 
Instant(3)), PaneInfo.NO_FIRING));
+// EARLY firing... waiting >= 1 sec
+try {
+  Thread.sleep(2000);
+} catch (InterruptedException e) {
+  e.printStackTrace();
+}
+
+doFnTransform.onWatermark(new Watermark(5));
+assertEquals(1, oc.outputs.size());
+assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+// ACCUMULATION MODE
+checkOutput(KV.of("1", Arrays.asList("hello", "world")), 
oc.outputs.get(0).getValue());
+LOG.info("Output: {}", oc.outputs.get(0));
+oc.outputs.clear();
+
+// ON TIME
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), 
PaneInfo.NO_FIRING));
+doFnTransform.onWatermark(new Watermark(5001));
+assertEquals(1, oc.outputs.size());
+assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
+LOG.info("Output: {}", oc.outputs.get(0));
+// ACCUMULATION MODE
+checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), 
oc.outputs.get(0).getValue());
+oc.outputs.clear();
+
+// LATE DATA
+// actual window: [0-5000)
+// allowed lateness: 1000 (ms)
+// current watermark: 5001
+// data: 4500
+// the data timestamp + allowed lateness > current watermark,
+// so it should be accumulated to the prev window
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "hello!"), new Instant(4500),
+  window.assignWindow(new Instant(4500)), PaneInfo.NO_FIRING));
+doFnTransform.onWatermark(new Watermark(6000));
+assertEquals(1, oc.outputs.size());
+assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+LOG.info("Output: {}", oc.outputs.get(0));
+// The data should  be accumulated to the previous window because it 
allows 1 second lateness
+checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!", "hello!")), 
oc.outputs.get(0).getValue());
+oc.outputs.clear();
+
+// LATE DATA
+// data timestamp: 4800
+// current watermark: 6000
+// data timestamp + allowed lateness < current watermark
+// It should not be accumulated to the prev window
+doFnTransform.onData(WindowedValue.of(
+  KV.of("1", "hello!"), new Instant(4800),
 
 Review comment:
   hello -> bye 
   
    


This is an automated message from the Apache Git Service.
To respond to the message, please log on 

[GitHub] taegeonum opened a new pull request #173: [NEMO-307] Enable integration test

2018-12-03 Thread GitBox
taegeonum opened a new pull request #173: [NEMO-307] Enable integration test
URL: https://github.com/apache/incubator-nemo/pull/173
 
 
   JIRA: [NEMO-307: Enable integration 
test](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-307)
   
   **Major changes:**
   - Fix pom to enable integration test
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum opened a new pull request #172: [NEMO-270] Test different triggers in GroupByKeyAndWindowDoFnTransformTest

2018-12-03 Thread GitBox
taegeonum opened a new pull request #172: [NEMO-270] Test different triggers in 
GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172
 
 
   JIRA: [NEMO-270: Test different triggers in 
GroupByKeyAndWindowDoFnTransformTest](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-270)
   
   **Major changes:**
   - Fix `GroupByKeyAndWindowDoFnTransform` to properly handle `EARLY` and 
`LATE` triggering.
   
   **Tests for the changes:**
   - Add `eventTimeTriggerTest` to test complex triggering and lateness. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum closed pull request #171: [NEMO-306] Add license checkstyle

2018-12-03 Thread GitBox
taegeonum closed pull request #171: [NEMO-306] Add license checkstyle
URL: https://github.com/apache/incubator-nemo/pull/171
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum closed pull request #163: [NEMO-294] Beam-Runner

2018-12-02 Thread GitBox
taegeonum closed pull request #163: [NEMO-294] Beam-Runner
URL: https://github.com/apache/incubator-nemo/pull/163
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/run_beam.sh b/bin/run_beam.sh
index 41c1ef598..cbd082c7a 100755
--- a/bin/run_beam.sh
+++ b/bin/run_beam.sh
@@ -17,4 +17,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/beam/target/nemo-examples-beam-0.1-SNAPSHOT-shaded.jar:`yarn 
classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/beam/target/nemo-examples-beam-$(mvn -q \
+  -Dexec.executable=echo -Dexec.args='${project.version}' \
+  --non-recursive exec:exec)-shaded.jar:`yarn classpath` 
org.apache.nemo.client.JobLauncher "$@"
diff --git a/bin/run_spark.sh b/bin/run_spark.sh
index 057b01747..314fd0d0b 100755
--- a/bin/run_spark.sh
+++ b/bin/run_spark.sh
@@ -17,4 +17,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/spark/target/nemo-examples-spark-0.1-SNAPSHOT-shaded.jar:`yarn 
classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/spark/target/nemo-examples-spark-$(mvn -q \
+  -Dexec.executable=echo -Dexec.args='${project.version}' \
+  --non-recursive exec:exec)-shaded.jar:`yarn classpath` 
org.apache.nemo.client.JobLauncher "$@"
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java 
b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 035d71985..f95bc73fc 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -101,6 +101,23 @@ private JobLauncher() {
* @throws Exception exception on the way.
*/
   public static void main(final String[] args) throws Exception {
+try {
+  setup(args);
+  // Launch client main. The shutdown() method is called inside the 
launchDAG() method.
+  runUserProgramMain(builtJobConf);
+} catch (final InjectionException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  /**
+   * Set up the driver, etc. before the actual execution.
+   * @param args arguments.
+   * @throws InjectionException injection exception from REEF.
+   * @throws ClassNotFoundException class not found exception.
+   * @throws IOException IO exception.
+   */
+  public static void setup(final String[] args) throws InjectionException, 
ClassNotFoundException, IOException {
 // Get Job and Driver Confs
 builtJobConf = getJobConf(args);
 
@@ -108,77 +125,76 @@ public static void main(final String[] args) throws 
Exception {
 LOG.info("Launching RPC Server");
 driverRPCServer = new DriverRPCServer();
 driverRPCServer
-
.registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event 
-> {
-})
-.registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, 
event -> driverReadyLatch.countDown())
-
.registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event 
-> jobDoneLatch.countDown())
-
.registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, 
message -> COLLECTED_DATA.addAll(
-
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()
-.run();
+  .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, 
event -> {
+  })
+  .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, 
event -> driverReadyLatch.countDown())
+  .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, 
event -> jobDoneLatch.countDown())
+  .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, 
message -> COLLECTED_DATA.addAll(
+
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()
+  .run();
 
 final Configuration driverConf = getDriverConf(builtJobConf);
 final Configuration driverNcsConf = getDriverNcsConf();
-final Configuration driverMessageConfg = getDriverMessageConf();
+final Configuration driverMessageConfig = getDriverMessageConf();
+final String defaultExecutorResourceConfig = 
"[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
+  + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
 final Configuration executorResourceConfig = getJSONConf(builtJobConf, 
JobConf.ExecutorJSONPath.class,
-JobConf.ExecutorJSONContents.class);
+  JobConf.ExecutorJSONContents.class, 

[GitHub] taegeonum closed pull request #163: [NEMO-294] Beam-Runner

2018-12-02 Thread GitBox
taegeonum closed pull request #163: [NEMO-294] Beam-Runner
URL: https://github.com/apache/incubator-nemo/pull/163
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/run_beam.sh b/bin/run_beam.sh
index 41c1ef598..cbd082c7a 100755
--- a/bin/run_beam.sh
+++ b/bin/run_beam.sh
@@ -17,4 +17,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/beam/target/nemo-examples-beam-0.1-SNAPSHOT-shaded.jar:`yarn 
classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/beam/target/nemo-examples-beam-$(mvn -q \
+  -Dexec.executable=echo -Dexec.args='${project.version}' \
+  --non-recursive exec:exec)-shaded.jar:`yarn classpath` 
org.apache.nemo.client.JobLauncher "$@"
diff --git a/bin/run_spark.sh b/bin/run_spark.sh
index 057b01747..314fd0d0b 100755
--- a/bin/run_spark.sh
+++ b/bin/run_spark.sh
@@ -17,4 +17,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/spark/target/nemo-examples-spark-0.1-SNAPSHOT-shaded.jar:`yarn 
classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp 
examples/spark/target/nemo-examples-spark-$(mvn -q \
+  -Dexec.executable=echo -Dexec.args='${project.version}' \
+  --non-recursive exec:exec)-shaded.jar:`yarn classpath` 
org.apache.nemo.client.JobLauncher "$@"
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java 
b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 035d71985..f95bc73fc 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -101,6 +101,23 @@ private JobLauncher() {
* @throws Exception exception on the way.
*/
   public static void main(final String[] args) throws Exception {
+try {
+  setup(args);
+  // Launch client main. The shutdown() method is called inside the 
launchDAG() method.
+  runUserProgramMain(builtJobConf);
+} catch (final InjectionException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  /**
+   * Set up the driver, etc. before the actual execution.
+   * @param args arguments.
+   * @throws InjectionException injection exception from REEF.
+   * @throws ClassNotFoundException class not found exception.
+   * @throws IOException IO exception.
+   */
+  public static void setup(final String[] args) throws InjectionException, 
ClassNotFoundException, IOException {
 // Get Job and Driver Confs
 builtJobConf = getJobConf(args);
 
@@ -108,77 +125,76 @@ public static void main(final String[] args) throws 
Exception {
 LOG.info("Launching RPC Server");
 driverRPCServer = new DriverRPCServer();
 driverRPCServer
-
.registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event 
-> {
-})
-.registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, 
event -> driverReadyLatch.countDown())
-
.registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event 
-> jobDoneLatch.countDown())
-
.registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, 
message -> COLLECTED_DATA.addAll(
-
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()
-.run();
+  .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, 
event -> {
+  })
+  .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, 
event -> driverReadyLatch.countDown())
+  .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, 
event -> jobDoneLatch.countDown())
+  .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, 
message -> COLLECTED_DATA.addAll(
+
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()
+  .run();
 
 final Configuration driverConf = getDriverConf(builtJobConf);
 final Configuration driverNcsConf = getDriverNcsConf();
-final Configuration driverMessageConfg = getDriverMessageConf();
+final Configuration driverMessageConfig = getDriverMessageConf();
+final String defaultExecutorResourceConfig = 
"[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
+  + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
 final Configuration executorResourceConfig = getJSONConf(builtJobConf, 
JobConf.ExecutorJSONPath.class,
-JobConf.ExecutorJSONContents.class);
+  JobConf.ExecutorJSONContents.class, 

[GitHub] jooykim opened a new pull request #171: [NEMO-306] Add license checkstyle

2018-12-02 Thread GitBox
jooykim opened a new pull request #171: [NEMO-306] Add license checkstyle
URL: https://github.com/apache/incubator-nemo/pull/171
 
 
   JIRA: [NEMO-306: Apache License 
Checkstyle](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-306)
   
   **Major changes:**
   - Adds a checkstyle rule to check Apache license headers
   
   **Minor changes to note:**
   - N/A
   
   **Tests for the changes:**
   - N/A
   
   **Other comments:**
   - N/A
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho closed pull request #164: [NEMO-293] OOM exception in streaming

2018-12-02 Thread GitBox
seojangho closed pull request #164: [NEMO-293] OOM exception in streaming
URL: https://github.com/apache/incubator-nemo/pull/164
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 315760c99..12761b27f 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -18,10 +18,14 @@
  */
 package org.apache.nemo.runtime.executor.bytetransfer;
 
+import io.netty.buffer.ByteBufOutputStream;
+import org.apache.nemo.common.coder.EncoderFactory;
+import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.data.FileArea;
 import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,7 +98,7 @@ public void close() throws IOException {
   currentByteOutputStream.close();
 }
 
channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId()))
-.addListener(getChannelWriteListener());
+  .addListener(getChannelWriteListener());
 deregister();
 closed = true;
   }
@@ -150,7 +154,7 @@ public void write(final byte[] bytes, final int offset, 
final int length) throws
  * @throws IOException when an exception has been set or this stream was 
closed
  */
 public ByteOutputStream writeSerializedPartition(final SerializedPartition 
serializedPartition)
-throws IOException {
+  throws IOException {
   write(serializedPartition.getData(), 0, serializedPartition.getLength());
   return this;
 }
@@ -177,7 +181,7 @@ public ByteOutputStream writeFileArea(final FileArea 
fileArea) throws IOExceptio
 }
 
 @Override
-public synchronized void close() throws IOException {
+public void close() throws IOException {
   if (closed) {
 return;
   }
@@ -198,19 +202,41 @@ private void writeByteBuf(final ByteBuf byteBuf) throws 
IOException {
   }
 }
 
+/**
+ * Write an element to the channel.
+ * @param element element
+ * @param serializer serializer
+ */
+public void writeElement(final Object element,
+ final Serializer serializer) {
+  final ByteBuf byteBuf = channel.alloc().ioBuffer();
+  final ByteBufOutputStream byteBufOutputStream = new 
ByteBufOutputStream(byteBuf);
+  try {
+final OutputStream wrapped =
+  DataUtil.buildOutputStream(byteBufOutputStream, 
serializer.getEncodeStreamChainers());
+final EncoderFactory.Encoder encoder = 
serializer.getEncoderFactory().create(wrapped);
+encoder.encode(element);
+wrapped.close();
+
+writeByteBuf(byteBuf);
+  } catch (final IOException e) {
+throw new RuntimeException(e);
+  }
+}
+
 /**
  * Writes a data frame.
  * @param bodythe body or {@code null}
  * @param length  the length of the body, in bytes
  * @throws IOException when an exception has been set or this stream was 
closed
  */
-private synchronized void writeDataFrame(final Object body, final long 
length) throws IOException {
+private void writeDataFrame(final Object body, final long length) throws 
IOException {
   ensureNoException();
   if (closed) {
 throw new IOException("Stream already closed.");
   }
   
channel.writeAndFlush(DataFrameEncoder.DataFrame.newInstance(getContextId(), 
body, length, newSubStream))
-  .addListener(getChannelWriteListener());
+.addListener(getChannelWriteListener());
   newSubStream = false;
 }
   }
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index d50ad82fc..9995e6a92 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -37,6 +37,8 @@
   /**
* It forwards output to the next operator.
* @param nextOperatorVertex next operator to emit data and watermark
+   * @param edgeIndex edge index
+   * @param watermarkManager watermark manager
   

[GitHub] ejjeong opened a new pull request #170: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle Warnings

2018-11-30 Thread GitBox
ejjeong opened a new pull request #170: [NEMO-10] Handle Method Javadocs 
Requirements for Checkstyle Warnings
URL: https://github.com/apache/incubator-nemo/pull/170
 
 
   JIRA: [NEMO-10: [NEMO-10] Handle Method Javadocs Requirements for Checkstyle 
Warnings](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-10)
   
   **Major changes:**
   - Handle Method Javadocs Requirements for Checkstyle Warnings in `compiler`
   
   **Minor changes to note:**
   - Removed the unused parameter of `generateMetricCollectVertex` method in 
`SkewReshapingPass.java`.
   
   **Tests for the changes:**
   - maven checkstyle tests.
   
   
   Closes #170


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on issue #164: [NEMO-293] OOM exception in streaming

2018-11-30 Thread GitBox
taegeonum commented on issue #164: [NEMO-293] OOM exception in streaming
URL: https://github.com/apache/incubator-nemo/pull/164#issuecomment-443131143
 
 
   @seojangho I've addressed your comments. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on issue #163: [NEMO-294] Beam-Runner

2018-11-30 Thread GitBox
taegeonum commented on issue #163: [NEMO-294] Beam-Runner
URL: https://github.com/apache/incubator-nemo/pull/163#issuecomment-443131052
 
 
   @johnyangk @seojangho Could you please take a look? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho closed pull request #168: [NEMO-305] Add DISCLAIMER

2018-11-30 Thread GitBox
seojangho closed pull request #168: [NEMO-305] Add DISCLAIMER
URL: https://github.com/apache/incubator-nemo/pull/168
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jooykim opened a new pull request #168: [NEMO-305] Add DISCLAIMER

2018-11-30 Thread GitBox
jooykim opened a new pull request #168: [NEMO-305] Add DISCLAIMER
URL: https://github.com/apache/incubator-nemo/pull/168
 
 
   JIRA: [NEMO-305: Add 
Disclaimer](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-305)
   
   **Major changes:**
   - Adds the disclaimer file as per Apache's standards 
(https://incubator.apache.org/guides/branding.html#disclaimers)
   
   **Minor changes to note:**
   - N/A
   
   **Tests for the changes:**
   - N/A
   
   **Other comments:**
   - N/A
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yunseong opened a new pull request #167: [NEMO-304] Fail-fast for mis-configuration in user application

2018-11-29 Thread GitBox
yunseong opened a new pull request #167: [NEMO-304] Fail-fast for 
mis-configuration in user application
URL: https://github.com/apache/incubator-nemo/pull/167
 
 
   JIRA: [NEMO-304: Fail-fast for mis-configuration in user 
application](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-304)
   
   **Major changes:**
   - This PR makes JobLauncher check the validity of user main method early, so 
that we can avoid initializing the whole system if there is a mis-configuration.
   
   **Minor changes to note:**
   - 
   
   **Tests for the changes:**
   - Added `testNotExistingUserMain` in SparkScala.java to test whether the 
`InvalidUserMainException` is thrown if the argument of `addUserMain()` is a 
non-existing class.
   
   **Other comments:**
   - 
   
   Closes #GITHUB_PR_NUMBER
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] DifferentSC opened a new pull request #166: [NEMO-80] SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder

2018-11-29 Thread GitBox
DifferentSC opened a new pull request #166: [NEMO-80] SLF4J: Failed to load 
class org.slf4j.impl.StaticLoggerBinder
URL: https://github.com/apache/incubator-nemo/pull/166
 
 
   JIRA: [NEMO-###: 
TITLE](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-###)
   
   **Major changes:**
   - Add slf4j-simple dependency to fix the problem.
   
   **Minor changes to note:**
   - 
   
   **Tests for the changes:**
   - 
   
   **Other comments:**
   - 
   
   Closes #GITHUB_PR_NUMBER
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on issue #164: [NEMO-293] OOM exception in streaming

2018-11-29 Thread GitBox
seojangho commented on issue #164: [NEMO-293] OOM exception in streaming
URL: https://github.com/apache/incubator-nemo/pull/164#issuecomment-443077811
 
 
   - From Batch perspective, that flag in `writeDataFrame` alone does not help 
since using `writeFileArea` is also a common case.
   - Can you split the PR? Mixing a bugfix and refactoring makes little sense 
to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yunseong opened a new pull request #165: [NEMO-221] Confusing error messages for a failure at the client-side …

2018-11-29 Thread GitBox
yunseong opened a new pull request #165: [NEMO-221] Confusing error messages 
for a failure at the client-side …
URL: https://github.com/apache/incubator-nemo/pull/165
 
 
   …initialization
   
   JIRA: [NEMO-221: Confusing error messages for a failure at the client-side 
initialization](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-221)
   
   **Major changes:**
   - Throw a separate Exception if failed to create an RPC connection between 
Nemo client and REEF Driver (currently the IOException is swallowed by Tang's 
Injection Exception). This way, we can find the actual reason for the failure, 
although the beginning of the stack trace is still pretty lengthy with the 
network failure. An example is as follows:
   ```
   Exception in thread "main" java.lang.RuntimeException: java.lang.Exception: 
Thread main threw an uncaught exception.
at org.apache.nemo.client.JobLauncher.main(JobLauncher.java:180)
   Caused by: java.lang.Exception: Thread main threw an uncaught exception.
at 
org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler.uncaughtException(REEFUncaughtExceptionHandler.java:68)
at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1057)
at java.lang.ThreadGroup.uncaughtException(ThreadGroup.java:1052)
at java.lang.Thread.dispatchUncaughtException(Thread.java:1959)
   Caused by: java.lang.RuntimeException: Unable to configure and start 
REEFEnvironment.
at 
org.apache.reef.runtime.common.REEFLauncher.fatal(REEFLauncher.java:202)
at 
org.apache.reef.runtime.common.REEFLauncher.main(REEFLauncher.java:183)
   Caused by: java.lang.RuntimeException: 
org.apache.reef.tang.exceptions.InjectionException: Could not invoke 
constructor: new ClientRPC(TransportFactory = [ClassNodeImpl 
'org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory']: 
org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory(org.apache.reef.wake.remote.address.LocalAddressProvider),
  = 
org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory@6bb4dd34, 
LocalAddressProvider = [ClassNodeImpl 
'org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider']: 
org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider(),  = 
org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider@7d9f158f, 
String ClientSideRPCServerHost = 127.0.0.1, Integer ClientSideRPCServerPort = 
19888)
   ...
   (25 lines)
   ...
at 
org.apache.reef.runtime.common.REEFEnvironment.fromConfiguration(REEFEnvironment.java:69)
at 
org.apache.reef.runtime.common.REEFLauncher.main(REEFLauncher.java:180)
   Caused by: org.apache.reef.tang.exceptions.InjectionException: Could not 
invoke constructor: new ClientRPC(TransportFactory = [ClassNodeImpl 
'org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory']: 
org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory(org.apache.reef.wake.remote.address.LocalAddressProvider),
  = 
org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory@6bb4dd34, 
LocalAddressProvider = [ClassNodeImpl 
'org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider']: 
org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider(),  = 
org.apache.reef.wake.remote.address.HostnameBasedLocalAddressProvider@7d9f158f, 
String ClientSideRPCServerHost = 127.0.0.1, Integer ClientSideRPCServerPort = 
19888)
at 
org.apache.reef.tang.implementation.java.InjectorImpl.injectFromPlan(InjectorImpl.java:654)
   ...
   (10 lines)
   ...
 at 
org.apache.reef.tang.implementation.java.InjectorImpl.getNamedInstance(InjectorImpl.java:546)
at org.apache.reef.tang.InjectionFuture.get(InjectionFuture.java:116)
... 26 more
   Caused by: java.lang.IllegalStateException: Failed to setup an RPC 
connection to the Client. A failure at the client-side is suspected.
at org.apache.nemo.runtime.master.ClientRPC.(ClientRPC.java:68)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at 
org.apache.reef.tang.implementation.java.InjectorImpl.injectFromPlan(InjectorImpl.java:637)
... 37 more
   ```
   
   **Minor changes to note:**
   - 
   
   **Tests for the changes:**
   - 
   
   **Other comments:**
   -
   
   Closes #GITHUB_PR_NUMBER
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git 

[GitHub] wonook commented on a change in pull request #163: [NEMO-294] Beam-Runner

2018-11-28 Thread GitBox
wonook commented on a change in pull request #163: [NEMO-294] Beam-Runner
URL: https://github.com/apache/incubator-nemo/pull/163#discussion_r237151236
 
 

 ##
 File path: client/src/main/java/org/apache/nemo/client/JobLauncher.java
 ##
 @@ -101,84 +101,100 @@ private JobLauncher() {
* @throws Exception exception on the way.
*/
   public static void main(final String[] args) throws Exception {
+try {
+  setup(args);
+  // Launch client main. The shutdown() method is called inside the 
launchDAG() method.
+  runUserProgramMain(builtJobConf);
+} catch (final InjectionException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  /**
+   * Set up the driver, etc. before the actual execution.
+   * @param args arguments.
+   * @throws InjectionException injection exception from REEF.
+   * @throws ClassNotFoundException class not found exception.
+   * @throws IOException IO exception.
+   */
+  public static void setup(final String[] args) throws InjectionException, 
ClassNotFoundException, IOException {
 // Get Job and Driver Confs
 builtJobConf = getJobConf(args);
 
 // Registers actions for launching the DAG.
 LOG.info("Launching RPC Server");
 driverRPCServer = new DriverRPCServer();
 driverRPCServer
-
.registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event 
-> {
-})
-.registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, 
event -> driverReadyLatch.countDown())
-
.registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event 
-> jobDoneLatch.countDown())
-
.registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, 
message -> COLLECTED_DATA.addAll(
-
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()
-.run();
+  .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, 
event -> {
+  })
+  .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, 
event -> driverReadyLatch.countDown())
+  .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, 
event -> jobDoneLatch.countDown())
+  .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, 
message -> COLLECTED_DATA.addAll(
+
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()
+  .run();
 
 final Configuration driverConf = getDriverConf(builtJobConf);
 final Configuration driverNcsConf = getDriverNcsConf();
-final Configuration driverMessageConfg = getDriverMessageConf();
+final Configuration driverMessageConfig = getDriverMessageConf();
+final String defaultExecutorResourceConfig = 
"[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
 
 Review comment:
   When making a JAR file, and executing the JAR, sadly the json file is 
doesn't get included in the JAR. I've tried refactoring the code into 
`src/main/resources` directory, but it didn't work. This change was made to 
make it possible to apply a default `executor.json` even when it is not 
explicitly specified.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum opened a new pull request #164: [NEMO-293] OOM exception in streaming

2018-11-28 Thread GitBox
taegeonum opened a new pull request #164: [NEMO-293] OOM exception in streaming
URL: https://github.com/apache/incubator-nemo/pull/164
 
 
   JIRA: [NEMO-293: OOM exception in 
streaming](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-293)
   
   **Major changes:**
   - Fix wrong byte encoding in `PipeOutputWriter`. This causes OOM because it 
sends unnecessary bytes (count <= byte array size)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sj6077 commented on issue #146: [NEMO-244] Organize integration tests and resources

2018-11-28 Thread GitBox
sj6077 commented on issue #146: [NEMO-244] Organize integration tests and 
resources
URL: https://github.com/apache/incubator-nemo/pull/146#issuecomment-442362535
 
 
   @sanha Please merge this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on a change in pull request #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo

2018-11-27 Thread GitBox
wonook commented on a change in pull request #95: [NEMO-25] Improve WebUI to 
use RESTful APIs by Nemo
URL: https://github.com/apache/incubator-nemo/pull/95#discussion_r236939076
 
 

 ##
 File path: .travis.yml
 ##
 @@ -17,17 +17,29 @@
 # under the License.
 
 # .travis.yml
-# For maven builds
-language: java
+language:
+  # For maven builds
+  - java
+  # For web ui build
+  - node_js
 dist: trusty
+<<< HEAD
 
 Review comment:
   This is preventing the Travis CI to check this PR. Can this be fixed? I 
think just replacing `sudo:false` with `sudo:required` in the original code 
would be sufficient. @skystar-p or @seojangho ? I don't have access to the 
original branch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook opened a new pull request #163: [NEMO-294] Beam-Runner

2018-11-26 Thread GitBox
wonook opened a new pull request #163: [NEMO-294] Beam-Runner
URL: https://github.com/apache/incubator-nemo/pull/163
 
 
   JIRA: [NEMO-294: Support Nemo Runner execution by providing PipelineOptions 
to the Beam 
program](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-294)
   
   **Major changes:**
   - Update launch scripts to launch shaded jars with the current project 
version, instead of 0.1-SNAPSHOT
   - Refactor `JobLauncher` so that jobs can be launched from the JobLauncher's 
main class as well as from a particular Beam application. This also changes so 
that the job ID can be received from the application itself.
   - Use Google's AutoService from Beam to register NemoRunner as one of its 
runners with the `NemoRunnerRegistrar`.
   - Rename `NemoPipelineRunner` into `NemoRunner` to follow conventions.
   - Instead of having a default `executor_json` file, put in a default JSON 
string instead, so that it doesn't require a JSON file by default.
   - Add a MinimalWordCount examples from the beam homepage (from the 
quickstart article)
   
   **Minor changes to note:**
   - Fix minor typos (e.g., confg --> config)
   - Fix minor indentation inconsistencies
   
   **Tests for the changes:**
   - Existing tests confirm that these changes do not break the code
   
   **Other comments:**
   - N/A
   
   Closes #GITHUB_PR_NUMBER


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk closed pull request #162: [NEMO-303] Use Surefire 3.0.0-M1

2018-11-25 Thread GitBox
johnyangk closed pull request #162: [NEMO-303] Use Surefire 3.0.0-M1
URL: https://github.com/apache/incubator-nemo/pull/162
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 20cfc45af..7ed51fe03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@ under the License.
 
 2.13.0
 2.0.0-beta.5
+3.0.0-M1
 4.12
 
 
@@ -156,23 +157,28 @@ under the License.
 org.codehaus.mojo
 sonar-maven-plugin
 
+
+org.apache.maven.plugins
+maven-surefire-plugin
+${surefire.version}
+
+
+org.apache.maven.plugins
+maven-failsafe-plugin
+${surefire.version}
+
+
+
+integration-test
+verify
+
+
+
+
 
 
 
 
-
-org.apache.maven.plugins
-maven-failsafe-plugin
-2.20.1
-
-
-
-integration-test
-verify
-
-
-
-
 
 org.apache.maven.plugins
 maven-checkstyle-plugin


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho opened a new pull request #162: [NEMO-303] Use Surefire 3.0.0-M1

2018-11-25 Thread GitBox
seojangho opened a new pull request #162: [NEMO-303] Use Surefire 3.0.0-M1
URL: https://github.com/apache/incubator-nemo/pull/162
 
 
   JIRA: [NEMO-303: Use Surefire 
3.0.0-M1](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-303)
   
   **Minor changes to note:**
   - POM: Use maven-surefire-plugin 3.0.0-M1 and maven-failsafe-plugin 3.0.0-M1
   
   **Notes**
   - See SUREFIRE-1588 for the relevant bug report at the ASF Jira system


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on a change in pull request #95: [NEMO-25] Improve WebUI to use RESTful APIs by Nemo

2018-11-24 Thread GitBox
wonook commented on a change in pull request #95: [NEMO-25] Improve WebUI to 
use RESTful APIs by Nemo
URL: https://github.com/apache/incubator-nemo/pull/95#discussion_r236059450
 
 

 ##
 File path: .travis.yml
 ##
 @@ -17,17 +17,29 @@
 # under the License.
 
 # .travis.yml
-# For maven builds
-language: java
+language:
+  # For maven builds
+  - java
+  # For web ui build
+  - node_js
 dist: trusty
+<<< HEAD
 
 Review comment:
   This must have gotten in there by mistake


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk closed pull request #161: [NEMO-300] Fix starvation when handling multiple pending data fetchers

2018-11-21 Thread GitBox
johnyangk closed pull request #161: [NEMO-300] Fix starvation when handling 
multiple pending data fetchers
URL: https://github.com/apache/incubator-nemo/pull/161
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index b08b12a25..af3197999 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -461,31 +461,34 @@ private boolean handleDataFetchers(final 
List fetchers) {
 
   final Iterator pendingIterator = pendingFetchers.iterator();
   final long currentTime = System.currentTimeMillis();
-  // We check pending data every polling interval
-  while (pendingIterator.hasNext()
-&& isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+
+
+  if (isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+// We check pending data every polling interval
 prevPollingTime = currentTime;
 
-final DataFetcher dataFetcher = pendingIterator.next();
-try {
-  final Object element = dataFetcher.fetchDataElement();
-  onEventFromDataFetcher(element, dataFetcher);
+while (pendingIterator.hasNext()) {
+  final DataFetcher dataFetcher = pendingIterator.next();
+  try {
+final Object element = dataFetcher.fetchDataElement();
+onEventFromDataFetcher(element, dataFetcher);
+
+// We processed data. This means the data fetcher is now available.
+// Add current data fetcher to available
+pendingIterator.remove();
+if (!(element instanceof Finishmark)) {
+  availableFetchers.add(dataFetcher);
+}
 
-  // We processed data. This means the data fetcher is now available.
-  // Add current data fetcher to available
-  pendingIterator.remove();
-  if (!(element instanceof Finishmark)) {
-availableFetchers.add(dataFetcher);
+  } catch (final NoSuchElementException e) {
+// The current data fetcher is still pending.. try next data 
fetcher
+  } catch (final IOException e) {
+// IOException means that this task should be retried.
+taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+  Optional.empty(), 
Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
+LOG.error("{} Execution Failed (Recoverable: input read failure)! 
Exception: {}", taskId, e);
+return false;
   }
-
-} catch (final NoSuchElementException e) {
-  // The current data fetcher is still pending.. try next data fetcher
-} catch (final IOException e) {
-  // IOException means that this task should be retried.
-  taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
-Optional.empty(), 
Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
-  LOG.error("{} Execution Failed (Recoverable: input read failure)! 
Exception: {}", taskId, e);
-  return false;
 }
   }
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum closed pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-21 Thread GitBox
taegeonum closed pull request #159: [NEMO-216,251,259] Support side inputs and 
windowing
URL: https://github.com/apache/incubator-nemo/pull/159
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java 
b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
index a6269a085..6a6ca4d43 100644
--- a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
@@ -20,7 +20,6 @@
 
 import org.apache.nemo.common.exception.CompileTimeOptimizationException;
 import org.apache.nemo.common.ir.edge.IREdge;
-import 
org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
 import 
org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import org.apache.nemo.common.ir.vertex.*;
@@ -258,14 +257,6 @@ private void sinkCheck() {
* Helper method to check that all execution properties are correct and 
makes sense.
*/
   private void executionPropertyCheck() {
-// SideInput is not compatible with Push
-vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e 
instanceof IREdge).map(e -> (IREdge) e)
-.filter(e -> 
e.getPropertyValue(BroadcastVariableIdProperty.class).isPresent())
-.filter(e -> 
DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
-.forEach(e -> {
-  throw new CompileTimeOptimizationException("DAG execution property 
check: "
-  + "Broadcast edge is not compatible with push" + e.getId());
-}));
 // DataSizeMetricCollection is not compatible with Push (All data have to 
be stored before the data collection)
 vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e 
instanceof IREdge).map(e -> (IREdge) e)
 .filter(e -> 
Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
diff --git 
a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java 
b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index e676e6e8c..1055f0bb9 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -49,7 +49,7 @@ public boolean equals(final Object o) {
 
   @Override
   public String toString() {
-return String.valueOf(timestamp);
+return String.valueOf("Watermark(" + timestamp + ")");
   }
 
   @Override
diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
new file mode 100644
index 0..9a18b0c9f
--- /dev/null
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam;
+
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.compiler.frontend.beam.transform.CreateViewTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+/**
+ * Accumulates and provides side inputs in memory.
+ */
+public final class InMemorySideInputReader implements 
ReadyCheckingSideInputReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemorySideInputReader.class.getName());
+
+  private long curWatermark = Long.MIN_VALUE;
+
+  private final Collection> sideInputsToRead;
+  private final Map, 

[GitHub] johnyangk commented on issue #160: [NEMO-2186] Parallelism=1 for PCollectionView

2018-11-21 Thread GitBox
johnyangk commented on issue #160: [NEMO-2186] Parallelism=1 for PCollectionView
URL: https://github.com/apache/incubator-nemo/pull/160#issuecomment-440575480
 
 
   @taegeonum Please review this after #159 is merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk opened a new pull request #160: [NEMO-2186] Parallelism=1 for PCollectionView

2018-11-21 Thread GitBox
johnyangk opened a new pull request #160: [NEMO-2186] Parallelism=1 for 
PCollectionView
URL: https://github.com/apache/incubator-nemo/pull/160
 
 
   JIRA: [NEMO-286: Parallelism=1 for 
PCollectionView](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-286)
   
   **Major changes:**
   - Set Parallelism=1 for side input stages
   - Set CommPattern=Broadcast from/to side input stages


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235279915
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 ##
 @@ -132,10 +145,14 @@ public final DoFn getDoFn() {
* Checks whether the bundle is finished or not.
* Starts the bundle if it is done.
*/
-  protected final void checkAndInvokeBundle() {
+  final void checkAndInvokeBundle() {
 if (bundleFinished) {
   bundleFinished = false;
-  doFnRunner.startBundle();
+  if (pushBackRunner == null) {
 
 Review comment:
   That will give us 3 duplicate methods.
   I left a comment on refactoring. Maybe we can come back to this when 
implementing streaming partial combiner (i.e., combining ReduceFn) which will 
probably also use pushBackRunner? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235279915
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 ##
 @@ -132,10 +145,14 @@ public final DoFn getDoFn() {
* Checks whether the bundle is finished or not.
* Starts the bundle if it is done.
*/
-  protected final void checkAndInvokeBundle() {
+  final void checkAndInvokeBundle() {
 if (bundleFinished) {
   bundleFinished = false;
-  doFnRunner.startBundle();
+  if (pushBackRunner == null) {
 
 Review comment:
   That will give us 3 duplicate methods.
   I left a comment on refactoring. Maybe we can come back to this when 
implementing streaming partial combiner (i.e., combining ReduceFn)? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235277020
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 ##
 @@ -132,25 +150,46 @@ public final DoFn getDoFn() {
* Checks whether the bundle is finished or not.
* Starts the bundle if it is done.
*/
-  protected final void checkAndInvokeBundle() {
+  final void checkAndInvokeBundle() {
 if (bundleFinished) {
   bundleFinished = false;
-  doFnRunner.startBundle();
+  if (pushBackRunner == null) {
+doFnRunner.startBundle();
+  } else {
+pushBackRunner.startBundle();
+  }
   prevBundleStartTime = System.currentTimeMillis();
   currBundleCount = 0;
 }
 currBundleCount += 1;
   }
 
-
   /**
* Checks whether it is time to finish the bundle and finish it.
*/
-  protected final void checkAndFinishBundle() {
+  final void checkAndFinishBundle() {
 if (!bundleFinished) {
   if (currBundleCount >= bundleSize || System.currentTimeMillis() - 
prevBundleStartTime >= bundleMillis) {
 bundleFinished = true;
+if (pushBackRunner == null) {
+  doFnRunner.finishBundle();
+} else {
+  pushBackRunner.finishBundle();
+}
+  }
+}
+  }
+
+  /**
+   * Finish bundle without checking for conditions.
+   */
+  final void forceFinishBundle() {
 
 Review comment:
   This is also used in AbstractDoFnTransform#close


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235277346
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param  input type.
+ * @param  output type.
+ */
+public final class PushBackDoFnTransform extends 
AbstractDoFnTransform {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+  private List> curPushedBacks;
+  private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back 
exists.
+  private long curInputWatermark;
+  private long curOutputWatermark;
+
+  /**
+   * PushBackDoFnTransform Constructor.
+   */
+  public PushBackDoFnTransform(final DoFn doFn,
+   final Coder inputCoder,
+   final Map, Coder> outputCoders,
+   final TupleTag mainOutputTag,
+   final List> additionalOutputTags,
+   final WindowingStrategy windowingStrategy,
+   final Map> 
sideInputs,
+   final PipelineOptions options,
+   final DisplayData displayData) {
+super(doFn, inputCoder, outputCoders, mainOutputTag,
+  additionalOutputTags, windowingStrategy, sideInputs, options, 
displayData);
+this.curPushedBacks = new ArrayList<>();
+this.curPushedBackWatermark = Long.MAX_VALUE;
+this.curInputWatermark = Long.MIN_VALUE;
+this.curOutputWatermark = Long.MIN_VALUE;
+  }
+
+  @Override
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+return initDoFn;
+  }
+
+  @Override
+  public void onData(final WindowedValue data) {
+// Need to distinguish side/main inputs and push-back main inputs.
+if (data.getValue() instanceof SideInputElement) {
+  // This element is a Side Input
+  // TODO #287: Consider Explicit Multi-Input IR Transform
+  final WindowedValue sideInputElement = 
(WindowedValue) data;
+  final PCollectionView view = 
getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+  getSideInputReader().addSideInputElement(view, data);
+
+  handlePushBacks();
+
+  // See if we can emit a new watermark, as we may have processed some 
pushed-back elements
+  onWatermark(new Watermark(curInputWatermark));
+} else {
+  // This element is the Main Input
+  checkAndInvokeBundle();
+  final Iterable> pushedBack =
+getPushBackRunner().processElementInReadyWindows(data);
+  for (final WindowedValue wv : pushedBack) {
+curPushedBackWatermark = Math.min(curPushedBackWatermark, 
wv.getTimestamp().getMillis());
+curPushedBacks.add(wv);
+  }
+  checkAndFinishBundle();
+}
+  }
+
+  private void handlePushBacks() {
+// Force-flush, before (possibly) processing pushed-back data.
+//
+// Main reason:
+// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+// caches for each bundle the side inputs that are not ready.
+// We need to 

[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235277341
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param  input type.
+ * @param  output type.
+ */
+public final class PushBackDoFnTransform extends 
AbstractDoFnTransform {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+  private List> curPushedBacks;
+  private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back 
exists.
+  private long curInputWatermark;
+  private long curOutputWatermark;
+
+  /**
+   * PushBackDoFnTransform Constructor.
+   */
+  public PushBackDoFnTransform(final DoFn doFn,
+   final Coder inputCoder,
+   final Map, Coder> outputCoders,
+   final TupleTag mainOutputTag,
+   final List> additionalOutputTags,
+   final WindowingStrategy windowingStrategy,
+   final Map> 
sideInputs,
+   final PipelineOptions options,
+   final DisplayData displayData) {
+super(doFn, inputCoder, outputCoders, mainOutputTag,
+  additionalOutputTags, windowingStrategy, sideInputs, options, 
displayData);
+this.curPushedBacks = new ArrayList<>();
+this.curPushedBackWatermark = Long.MAX_VALUE;
+this.curInputWatermark = Long.MIN_VALUE;
+this.curOutputWatermark = Long.MIN_VALUE;
+  }
+
+  @Override
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+return initDoFn;
+  }
+
+  @Override
+  public void onData(final WindowedValue data) {
+// Need to distinguish side/main inputs and push-back main inputs.
+if (data.getValue() instanceof SideInputElement) {
+  // This element is a Side Input
+  // TODO #287: Consider Explicit Multi-Input IR Transform
+  final WindowedValue sideInputElement = 
(WindowedValue) data;
+  final PCollectionView view = 
getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+  getSideInputReader().addSideInputElement(view, data);
+
+  handlePushBacks();
+
+  // See if we can emit a new watermark, as we may have processed some 
pushed-back elements
+  onWatermark(new Watermark(curInputWatermark));
+} else {
+  // This element is the Main Input
+  checkAndInvokeBundle();
+  final Iterable> pushedBack =
+getPushBackRunner().processElementInReadyWindows(data);
+  for (final WindowedValue wv : pushedBack) {
+curPushedBackWatermark = Math.min(curPushedBackWatermark, 
wv.getTimestamp().getMillis());
+curPushedBacks.add(wv);
+  }
+  checkAndFinishBundle();
+}
+  }
+
+  private void handlePushBacks() {
+// Force-flush, before (possibly) processing pushed-back data.
+//
+// Main reason:
+// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+// caches for each bundle the side inputs that are not ready.
+// We need to 

[GitHub] johnyangk commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
johnyangk commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235275944
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param  input type.
+ * @param  output type.
+ */
+public final class PushBackDoFnTransform extends 
AbstractDoFnTransform {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+  private List> curPushedBacks;
+  private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back 
exists.
+  private long curInputWatermark;
+  private long curOutputWatermark;
+
+  /**
+   * PushBackDoFnTransform Constructor.
+   */
+  public PushBackDoFnTransform(final DoFn doFn,
+   final Coder inputCoder,
+   final Map, Coder> outputCoders,
+   final TupleTag mainOutputTag,
+   final List> additionalOutputTags,
+   final WindowingStrategy windowingStrategy,
+   final Map> 
sideInputs,
+   final PipelineOptions options,
+   final DisplayData displayData) {
+super(doFn, inputCoder, outputCoders, mainOutputTag,
+  additionalOutputTags, windowingStrategy, sideInputs, options, 
displayData);
+this.curPushedBacks = new ArrayList<>();
+this.curPushedBackWatermark = Long.MAX_VALUE;
+this.curInputWatermark = Long.MIN_VALUE;
+this.curOutputWatermark = Long.MIN_VALUE;
+  }
+
+  @Override
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+return initDoFn;
+  }
+
+  @Override
+  public void onData(final WindowedValue data) {
+// Need to distinguish side/main inputs and push-back main inputs.
+if (data.getValue() instanceof SideInputElement) {
+  // This element is a Side Input
+  // TODO #287: Consider Explicit Multi-Input IR Transform
+  final WindowedValue sideInputElement = 
(WindowedValue) data;
+  final PCollectionView view = 
getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+  getSideInputReader().addSideInputElement(view, data);
+
+  handlePushBacks();
+
+  // See if we can emit a new watermark, as we may have processed some 
pushed-back elements
+  onWatermark(new Watermark(curInputWatermark));
+} else {
+  // This element is the Main Input
+  checkAndInvokeBundle();
+  final Iterable> pushedBack =
+getPushBackRunner().processElementInReadyWindows(data);
+  for (final WindowedValue wv : pushedBack) {
+curPushedBackWatermark = Math.min(curPushedBackWatermark, 
wv.getTimestamp().getMillis());
+curPushedBacks.add(wv);
+  }
+  checkAndFinishBundle();
+}
+  }
+
+  private void handlePushBacks() {
+// Force-flush, before (possibly) processing pushed-back data.
+//
+// Main reason:
+// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+// caches for each bundle the side inputs that are not ready.
+// We need to 

[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235247765
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param  input type.
+ * @param  output type.
+ */
+public final class PushBackDoFnTransform extends 
AbstractDoFnTransform {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+  private List> curPushedBacks;
+  private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back 
exists.
+  private long curInputWatermark;
+  private long curOutputWatermark;
+
+  /**
+   * PushBackDoFnTransform Constructor.
+   */
+  public PushBackDoFnTransform(final DoFn doFn,
+   final Coder inputCoder,
+   final Map, Coder> outputCoders,
+   final TupleTag mainOutputTag,
+   final List> additionalOutputTags,
+   final WindowingStrategy windowingStrategy,
+   final Map> 
sideInputs,
+   final PipelineOptions options,
+   final DisplayData displayData) {
+super(doFn, inputCoder, outputCoders, mainOutputTag,
+  additionalOutputTags, windowingStrategy, sideInputs, options, 
displayData);
+this.curPushedBacks = new ArrayList<>();
+this.curPushedBackWatermark = Long.MAX_VALUE;
+this.curInputWatermark = Long.MIN_VALUE;
+this.curOutputWatermark = Long.MIN_VALUE;
+  }
+
+  @Override
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+return initDoFn;
+  }
+
+  @Override
+  public void onData(final WindowedValue data) {
+// Need to distinguish side/main inputs and push-back main inputs.
+if (data.getValue() instanceof SideInputElement) {
+  // This element is a Side Input
+  // TODO #287: Consider Explicit Multi-Input IR Transform
+  final WindowedValue sideInputElement = 
(WindowedValue) data;
+  final PCollectionView view = 
getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+  getSideInputReader().addSideInputElement(view, data);
+
+  handlePushBacks();
+
+  // See if we can emit a new watermark, as we may have processed some 
pushed-back elements
+  onWatermark(new Watermark(curInputWatermark));
+} else {
+  // This element is the Main Input
+  checkAndInvokeBundle();
+  final Iterable> pushedBack =
+getPushBackRunner().processElementInReadyWindows(data);
+  for (final WindowedValue wv : pushedBack) {
+curPushedBackWatermark = Math.min(curPushedBackWatermark, 
wv.getTimestamp().getMillis());
+curPushedBacks.add(wv);
+  }
+  checkAndFinishBundle();
+}
+  }
+
+  private void handlePushBacks() {
+// Force-flush, before (possibly) processing pushed-back data.
+//
+// Main reason:
+// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+// caches for each bundle the side inputs that are not ready.
+// We need to 

[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235247665
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
 ##
 @@ -132,25 +150,46 @@ public final DoFn getDoFn() {
* Checks whether the bundle is finished or not.
* Starts the bundle if it is done.
*/
-  protected final void checkAndInvokeBundle() {
+  final void checkAndInvokeBundle() {
 if (bundleFinished) {
   bundleFinished = false;
-  doFnRunner.startBundle();
+  if (pushBackRunner == null) {
+doFnRunner.startBundle();
+  } else {
+pushBackRunner.startBundle();
+  }
   prevBundleStartTime = System.currentTimeMillis();
   currBundleCount = 0;
 }
 currBundleCount += 1;
   }
 
-
   /**
* Checks whether it is time to finish the bundle and finish it.
*/
-  protected final void checkAndFinishBundle() {
+  final void checkAndFinishBundle() {
 if (!bundleFinished) {
   if (currBundleCount >= bundleSize || System.currentTimeMillis() - 
prevBundleStartTime >= bundleMillis) {
 bundleFinished = true;
+if (pushBackRunner == null) {
+  doFnRunner.finishBundle();
+} else {
+  pushBackRunner.finishBundle();
+}
+  }
+}
+  }
+
+  /**
+   * Finish bundle without checking for conditions.
+   */
+  final void forceFinishBundle() {
 
 Review comment:
   This is only used for PushBackRunner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235247750
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param  input type.
+ * @param  output type.
+ */
+public final class PushBackDoFnTransform extends 
AbstractDoFnTransform {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+  private List> curPushedBacks;
+  private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back 
exists.
+  private long curInputWatermark;
+  private long curOutputWatermark;
+
+  /**
+   * PushBackDoFnTransform Constructor.
+   */
+  public PushBackDoFnTransform(final DoFn doFn,
+   final Coder inputCoder,
+   final Map, Coder> outputCoders,
+   final TupleTag mainOutputTag,
+   final List> additionalOutputTags,
+   final WindowingStrategy windowingStrategy,
+   final Map> 
sideInputs,
+   final PipelineOptions options,
+   final DisplayData displayData) {
+super(doFn, inputCoder, outputCoders, mainOutputTag,
+  additionalOutputTags, windowingStrategy, sideInputs, options, 
displayData);
+this.curPushedBacks = new ArrayList<>();
+this.curPushedBackWatermark = Long.MAX_VALUE;
+this.curInputWatermark = Long.MIN_VALUE;
+this.curOutputWatermark = Long.MIN_VALUE;
+  }
+
+  @Override
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+return initDoFn;
+  }
+
+  @Override
+  public void onData(final WindowedValue data) {
+// Need to distinguish side/main inputs and push-back main inputs.
+if (data.getValue() instanceof SideInputElement) {
+  // This element is a Side Input
+  // TODO #287: Consider Explicit Multi-Input IR Transform
+  final WindowedValue sideInputElement = 
(WindowedValue) data;
+  final PCollectionView view = 
getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+  getSideInputReader().addSideInputElement(view, data);
+
+  handlePushBacks();
+
+  // See if we can emit a new watermark, as we may have processed some 
pushed-back elements
+  onWatermark(new Watermark(curInputWatermark));
+} else {
+  // This element is the Main Input
+  checkAndInvokeBundle();
+  final Iterable> pushedBack =
+getPushBackRunner().processElementInReadyWindows(data);
+  for (final WindowedValue wv : pushedBack) {
+curPushedBackWatermark = Math.min(curPushedBackWatermark, 
wv.getTimestamp().getMillis());
+curPushedBacks.add(wv);
+  }
+  checkAndFinishBundle();
+}
+  }
+
+  private void handlePushBacks() {
+// Force-flush, before (possibly) processing pushed-back data.
+//
+// Main reason:
+// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+// caches for each bundle the side inputs that are not ready.
+// We need to 

[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235249217
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
 ##
 @@ -104,6 +104,7 @@ private boolean isWatermarkTriggerTime() {
   private Object retrieveElement() throws NoSuchElementException, IOException {
 // Emit watermark
 if (!bounded && isWatermarkTriggerTime()) {
+  // index=0 as there is only 1 input stream
 
 Review comment:
   please remove this comment 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235249002
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param  input type.
+ * @param  output type.
+ */
+public final class PushBackDoFnTransform extends 
AbstractDoFnTransform {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+  private List> curPushedBacks;
+  private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back 
exists.
+  private long curInputWatermark;
+  private long curOutputWatermark;
+
+  /**
+   * PushBackDoFnTransform Constructor.
+   */
+  public PushBackDoFnTransform(final DoFn doFn,
+   final Coder inputCoder,
+   final Map, Coder> outputCoders,
+   final TupleTag mainOutputTag,
+   final List> additionalOutputTags,
+   final WindowingStrategy windowingStrategy,
+   final Map> 
sideInputs,
+   final PipelineOptions options,
+   final DisplayData displayData) {
+super(doFn, inputCoder, outputCoders, mainOutputTag,
+  additionalOutputTags, windowingStrategy, sideInputs, options, 
displayData);
+this.curPushedBacks = new ArrayList<>();
+this.curPushedBackWatermark = Long.MAX_VALUE;
+this.curInputWatermark = Long.MIN_VALUE;
+this.curOutputWatermark = Long.MIN_VALUE;
+  }
+
+  @Override
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+return initDoFn;
+  }
+
+  @Override
+  public void onData(final WindowedValue data) {
+// Need to distinguish side/main inputs and push-back main inputs.
+if (data.getValue() instanceof SideInputElement) {
+  // This element is a Side Input
+  // TODO #287: Consider Explicit Multi-Input IR Transform
+  final WindowedValue sideInputElement = 
(WindowedValue) data;
+  final PCollectionView view = 
getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+  getSideInputReader().addSideInputElement(view, data);
+
+  handlePushBacks();
+
+  // See if we can emit a new watermark, as we may have processed some 
pushed-back elements
+  onWatermark(new Watermark(curInputWatermark));
+} else {
+  // This element is the Main Input
+  checkAndInvokeBundle();
+  final Iterable> pushedBack =
+getPushBackRunner().processElementInReadyWindows(data);
+  for (final WindowedValue wv : pushedBack) {
+curPushedBackWatermark = Math.min(curPushedBackWatermark, 
wv.getTimestamp().getMillis());
+curPushedBacks.add(wv);
+  }
+  checkAndFinishBundle();
+}
+  }
+
+  private void handlePushBacks() {
+// Force-flush, before (possibly) processing pushed-back data.
+//
+// Main reason:
+// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+// caches for each bundle the side inputs that are not ready.
+// We need to 

[GitHub] taegeonum commented on a change in pull request #159: [NEMO-216, 251, 259] Support side inputs and windowing

2018-11-20 Thread GitBox
taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r235249018
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
 ##
 @@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param  input type.
+ * @param  output type.
+ */
+public final class PushBackDoFnTransform extends 
AbstractDoFnTransform {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+  private List> curPushedBacks;
+  private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back 
exists.
+  private long curInputWatermark;
+  private long curOutputWatermark;
+
+  /**
+   * PushBackDoFnTransform Constructor.
+   */
+  public PushBackDoFnTransform(final DoFn doFn,
+   final Coder inputCoder,
+   final Map, Coder> outputCoders,
+   final TupleTag mainOutputTag,
+   final List> additionalOutputTags,
+   final WindowingStrategy windowingStrategy,
+   final Map> 
sideInputs,
+   final PipelineOptions options,
+   final DisplayData displayData) {
+super(doFn, inputCoder, outputCoders, mainOutputTag,
+  additionalOutputTags, windowingStrategy, sideInputs, options, 
displayData);
+this.curPushedBacks = new ArrayList<>();
+this.curPushedBackWatermark = Long.MAX_VALUE;
+this.curInputWatermark = Long.MIN_VALUE;
+this.curOutputWatermark = Long.MIN_VALUE;
+  }
+
+  @Override
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+return initDoFn;
+  }
+
+  @Override
+  public void onData(final WindowedValue data) {
+// Need to distinguish side/main inputs and push-back main inputs.
+if (data.getValue() instanceof SideInputElement) {
+  // This element is a Side Input
+  // TODO #287: Consider Explicit Multi-Input IR Transform
+  final WindowedValue sideInputElement = 
(WindowedValue) data;
+  final PCollectionView view = 
getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+  getSideInputReader().addSideInputElement(view, data);
+
+  handlePushBacks();
+
+  // See if we can emit a new watermark, as we may have processed some 
pushed-back elements
+  onWatermark(new Watermark(curInputWatermark));
+} else {
+  // This element is the Main Input
+  checkAndInvokeBundle();
+  final Iterable> pushedBack =
+getPushBackRunner().processElementInReadyWindows(data);
+  for (final WindowedValue wv : pushedBack) {
+curPushedBackWatermark = Math.min(curPushedBackWatermark, 
wv.getTimestamp().getMillis());
+curPushedBacks.add(wv);
+  }
+  checkAndFinishBundle();
+}
+  }
+
+  private void handlePushBacks() {
+// Force-flush, before (possibly) processing pushed-back data.
+//
+// Main reason:
+// {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+// caches for each bundle the side inputs that are not ready.
+// We need to 

  1   2   3   4   5   6   7   8   9   10   >