[GitHub] taegeonum commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
taegeonum commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229895793
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+/**
+ * This tracks the minimum input watermark among multiple input streams.
+ */
+public final class MultiInputWatermarkManager implements InputWatermarkManager 
{
+  private final Watermark[] watermarks;
+  private final OperatorVertex nextOperator;
+  private int minWatermarkIndex;
+  public MultiInputWatermarkManager(final int numEdges,
+final OperatorVertex nextOperator) {
+super();
+this.watermarks = new Watermark[numEdges];
+this.nextOperator = nextOperator;
+this.minWatermarkIndex = 0;
+// We initialize watermarks as min value because
+// we should not emit watermark until all edges emit watermarks.
+for (int i = 0; i < numEdges; i++) {
+  watermarks[i] = new Watermark(Long.MIN_VALUE);
+}
+  }
+
+  private int findNextMinWatermarkIndex() {
 
 Review comment:
   This returns index, so we cannot use Collections.min


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
taegeonum commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229897058
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+/**
+ * This tracks the minimum input watermark among multiple input streams.
+ */
+public final class MultiInputWatermarkManager implements InputWatermarkManager 
{
+  private final Watermark[] watermarks;
+  private final OperatorVertex nextOperator;
+  private int minWatermarkIndex;
+  public MultiInputWatermarkManager(final int numEdges,
+final OperatorVertex nextOperator) {
+super();
+this.watermarks = new Watermark[numEdges];
+this.nextOperator = nextOperator;
+this.minWatermarkIndex = 0;
+// We initialize watermarks as min value because
+// we should not emit watermark until all edges emit watermarks.
+for (int i = 0; i < numEdges; i++) {
+  watermarks[i] = new Watermark(Long.MIN_VALUE);
+}
+  }
+
+  private int findNextMinWatermarkIndex() {
+int index = -1;
+long timestamp = Long.MAX_VALUE;
+for (int i = 0; i < watermarks.length; i++) {
+  if (watermarks[i].getTimestamp() < timestamp) {
+index = i;
+timestamp = watermarks[i].getTimestamp();
+  }
+}
+return index;
+  }
+
+  @Override
+  public void trackAndEmitWatermarks(final int edgeIndex, final Watermark 
watermark) {
+if (edgeIndex == minWatermarkIndex) {
+  // update min watermark
+  final Watermark prevMinWatermark = watermarks[minWatermarkIndex];
+  watermarks[minWatermarkIndex] = watermark;
+   // find min watermark
+  minWatermarkIndex = findNextMinWatermarkIndex();
+  final Watermark minWatermark = watermarks[minWatermarkIndex];
+  assert minWatermark.getTimestamp() >= prevMinWatermark.getTimestamp();
 
 Review comment:
   Assertion is for debugging and it does not affect on the performance, and I 
think this helps to understand the code more easily. Are there any reason why 
nemo do not use assertion in code? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk closed pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk closed pull request #137: [NEMO-232] Implement InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 1675e9c0b..3a81df7f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,17 +235,17 @@ under the License.
 maven-javadoc-plugin
 3.0.0
 
-
*.org.apache.nemo.runtime.common.comm
-docs/apidocs
-docs/apidocs
+  
*.org.apache.nemo.runtime.common.comm
+  docs/apidocs
+  docs/apidocs
 
 
 
-aggregate
-
-aggregate
-
-site
+  aggregate
+  
+  aggregate
+  
+  site
 
 
   test-javadoc
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
index ffbbe56b1..a433f3a2c 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
@@ -141,8 +141,8 @@ public void notifyMaster(final String runtimeEdgeId, final 
long srcTaskIndex) {
   /**
* (SYNCHRONIZATION) Called by task threads.
*
-   * @param runtimeEdge
-   * @param srcTaskIndex
+   * @param runtimeEdge runtime edge
+   * @param srcTaskIndex source task index
* @return output contexts.
*/
   public List getOutputContexts(final RuntimeEdge 
runtimeEdge,
@@ -163,8 +163,8 @@ public Serializer getSerializer(final String runtimeEdgeId) 
{
   /**
* (SYNCHRONIZATION) Called by network threads.
*
-   * @param outputContext
-   * @throws InvalidProtocolBufferException
+   * @param outputContext output context
+   * @throws InvalidProtocolBufferException protobuf exception
*/
   public void onOutputContext(final ByteOutputContext outputContext) throws 
InvalidProtocolBufferException {
 final ControlMessage.PipeTransferContextDescriptor descriptor =
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 3f1bc9074..56c754038 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -34,6 +34,7 @@
 
   /**
* It forwards output to the next operator.
+   * @param nextOperatorVertex next operator to emit data and watermark
*/
   public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
 this.nextOperatorVertex = nextOperatorVertex;
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
new file mode 100644
index 0..66fb7aa81
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.punctuation.Watermark;
+
+
+/**
+ * An interface for tracking input watermarks among multiple input streams.
+ * --edge 1--
+ * --edge 2--  watermarkManager --(emitWatermark)-- nextOperator
+ * --edge 3--
+ */
+public 

[GitHub] johnyangk closed pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk closed pull request #137: [NEMO-232] Implement InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 1675e9c0b..3a81df7f0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,17 +235,17 @@ under the License.
 maven-javadoc-plugin
 3.0.0
 
-
*.org.apache.nemo.runtime.common.comm
-docs/apidocs
-docs/apidocs
+  
*.org.apache.nemo.runtime.common.comm
+  docs/apidocs
+  docs/apidocs
 
 
 
-aggregate
-
-aggregate
-
-site
+  aggregate
+  
+  aggregate
+  
+  site
 
 
   test-javadoc
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
index ffbbe56b1..a433f3a2c 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
@@ -141,8 +141,8 @@ public void notifyMaster(final String runtimeEdgeId, final 
long srcTaskIndex) {
   /**
* (SYNCHRONIZATION) Called by task threads.
*
-   * @param runtimeEdge
-   * @param srcTaskIndex
+   * @param runtimeEdge runtime edge
+   * @param srcTaskIndex source task index
* @return output contexts.
*/
   public List getOutputContexts(final RuntimeEdge 
runtimeEdge,
@@ -163,8 +163,8 @@ public Serializer getSerializer(final String runtimeEdgeId) 
{
   /**
* (SYNCHRONIZATION) Called by network threads.
*
-   * @param outputContext
-   * @throws InvalidProtocolBufferException
+   * @param outputContext output context
+   * @throws InvalidProtocolBufferException protobuf exception
*/
   public void onOutputContext(final ByteOutputContext outputContext) throws 
InvalidProtocolBufferException {
 final ControlMessage.PipeTransferContextDescriptor descriptor =
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 3f1bc9074..56c754038 100644
--- 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -34,6 +34,7 @@
 
   /**
* It forwards output to the next operator.
+   * @param nextOperatorVertex next operator to emit data and watermark
*/
   public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
 this.nextOperatorVertex = nextOperatorVertex;
diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
new file mode 100644
index 0..66fb7aa81
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.punctuation.Watermark;
+
+
+/**
+ * An interface for tracking input watermarks among multiple input streams.
+ * --edge 1--
+ * --edge 2--  watermarkManager --(emitWatermark)-- nextOperator
+ * --edge 3--
+ */
+public 

[GitHub] taegeonum closed pull request #138: [NEMO-237] Refactor ParentTaskDataFetcher to emit streaming data and watermark

2018-10-31 Thread GitBox
taegeonum closed pull request #138: [NEMO-237] Refactor ParentTaskDataFetcher 
to emit streaming data and watermark
URL: https://github.com/apache/incubator-nemo/pull/138
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
new file mode 100644
index 0..a9b0da3ce
--- /dev/null
+++ 
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.task;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.*;
+
+/**
+ * Task thread -> fetchDataElement() -> (((QUEUE))) <- List of iterators <- 
queueInsertionThreads
+ *
+ * Unlike {@link ParentTaskDataFetcher}, where the task thread directly 
consumes (and blocks on) iterators one by one,
+ * this class spawns threads that each forwards elements from an iterator to a 
global queue.
+ *
+ * This class should be used when dealing with unbounded data streams, as we 
do not want to be blocked on a
+ * single unbounded iterator forever.
+ */
+@NotThreadSafe
+class MultiThreadParentTaskDataFetcher extends DataFetcher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultiThreadParentTaskDataFetcher.class);
+
+  private final InputReader readersForParentTask;
+  private final ExecutorService queueInsertionThreads;
+
+  // Non-finals (lazy fetching)
+  private boolean firstFetch = true;
+
+  private final ConcurrentLinkedQueue elementQueue;
+
+  private long serBytes = 0;
+  private long encodedBytes = 0;
+
+  private int numOfIterators;
+  private int numOfFinishMarks = 0;
+
+  MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
+   final InputReader readerForParentTask,
+   final OutputCollector outputCollector) {
+super(dataSource, outputCollector);
+this.readersForParentTask = readerForParentTask;
+this.firstFetch = true;
+this.elementQueue = new ConcurrentLinkedQueue();
+this.queueInsertionThreads = Executors.newCachedThreadPool();
+  }
+
+  @Override
+  Object fetchDataElement() throws IOException, NoSuchElementException {
+if (firstFetch) {
+  fetchDataLazily();
+  firstFetch = false;
+}
+
+while (true) {
+  final Object element = elementQueue.poll();
+  if (element == null) {
+throw new NoSuchElementException();
+  } else if (element instanceof Finishmark) {
+numOfFinishMarks++;
+if (numOfFinishMarks == numOfIterators) {
+  return Finishmark.getInstance();
+}
+// else try again.
+  } else {
+return element;
+  }
+}
+  }
+
+  private void fetchDataLazily() {
+final List> futures = 
readersForParentTask.read();
+numOfIterators = futures.size();
+
+futures.forEach(compFuture -> compFuture.whenComplete((iterator, 
exception) -> {
+  // A thread for each iterator
+  queueInsertionThreads.submit(() -> {
+if (exception == null) {
+  // Consume this iterator to the end.
+  while (iterator.hasNext()) { // blocked on the iterator.
+final Object element = iterator.next();
+elementQueue.offer(element);
+  }
+
+  // This iterator is finished.
+  

[GitHub] jooykim opened a new pull request #140: [NEMO-254] Modify POM to publish maven artifacts

2018-10-31 Thread GitBox
jooykim opened a new pull request #140: [NEMO-254] Modify POM to publish maven 
artifacts
URL: https://github.com/apache/incubator-nemo/pull/140
 
 
   JIRA: [NEMO-254: Modify POM to publish maven 
artifacts](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-254)
   
   **Major changes:**
   - Modifies the current pom.xml to inherit the Apache POM 
   
   **Minor changes to note:**
   - N/A
   
   **Tests for the changes:**
   - N/A (The build suffices)
   
   **Other comments:**
   - This PR is to inherit the Apache POM which facilitates publishing our 
releases as maven artifacts
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229589941
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+/**
+ * This tracks the minimum input watermark among multiple input streams.
+ */
+public final class MultiInputWatermarkManager implements InputWatermarkManager 
{
+  private final Watermark[] watermarks;
 
 Review comment:
   Maybe use an ArrayList with the size initialized to `numEdges`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229596747
 
 

 ##
 File path: 
runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.util.LinkedList;
+import java.util.List;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+
+public final class InputWatermarkManagerTest {
+
+  @Test
+  public void test() {
+final List emittedWatermarks = new LinkedList<>();
+final Transform transform = mock(Transform.class);
+doAnswer(new Answer() {
 
 Review comment:
   Answer with the `Void` type?
   
https://stackoverflow.com/questions/2276271/how-to-make-mock-to-void-methods-with-mockito
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229598008
 
 

 ##
 File path: 
runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 ##
 @@ -330,6 +261,74 @@ public void testParentTaskDataFetching() throws Exception 
{
 assertTrue(checkEqualElements(elements, 
runtimeEdgeToOutputData.get(taskOutEdge.getId(;
   }
 
+  /**
+   * The DAG of the task to test will looks like:
+   * source1 -> vertex1 -> vertex2
+   * source2 -> vertex3 ->
+   *
+   * The vertex2 has two incoming edges (from vertex1 and vertex3)
+   * and we test if TaskExecutor handles data and watermarks correctly in this 
situation.
+   *
+   * The source1 emits watermark 500, 1800 and source2 emits watermark 1000.
+   * The vertex2 should receive and emit watermarks 500 and 1000.
 
 Review comment:
   If watermarks come in this order
   source1: 500
   source1: 1800
   source2: 1000
   
   Shouldn't there be only a single emission of 1000?
   
   Can you comment on the assumptions made in this test, and how the 
assumptions are ensured?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229596058
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 ##
 @@ -478,17 +511,24 @@ private boolean handleDataFetchers(final 
List fetchers) {
 return map;
   }
 
-  private Map> getInternalAdditionalOutputMap(
+  private Map> getInternalAdditionalOutputMap(
 
 Review comment:
   Maybe merge `getInternalAdditionalOutputMap` and `getInternalMainOutputs`, 
and remove the intermediate`edgeIndexMap`?
   
   Pair merged() {
 internal = new;
 additional = new;
   
 for (outedge) {
   if ... 
   then internal.add(index, ...) 
   else additional.add(index, ...)
 }
   }


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229596994
 
 

 ##
 File path: 
runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.util.LinkedList;
+import java.util.List;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+
+public final class InputWatermarkManagerTest {
+
+  @Test
+  public void test() {
+final List emittedWatermarks = new LinkedList<>();
 
 Review comment:
   Maybe a singleton Watermark initialized to -1?
   (Since the array size is always kept to 1 in the tests)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229590607
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+/**
+ * This tracks the minimum input watermark among multiple input streams.
+ */
+public final class MultiInputWatermarkManager implements InputWatermarkManager 
{
+  private final Watermark[] watermarks;
+  private final OperatorVertex nextOperator;
+  private int minWatermarkIndex;
+  public MultiInputWatermarkManager(final int numEdges,
+final OperatorVertex nextOperator) {
+super();
+this.watermarks = new Watermark[numEdges];
+this.nextOperator = nextOperator;
+this.minWatermarkIndex = 0;
+// We initialize watermarks as min value because
+// we should not emit watermark until all edges emit watermarks.
+for (int i = 0; i < numEdges; i++) {
+  watermarks[i] = new Watermark(Long.MIN_VALUE);
+}
+  }
+
+  private int findNextMinWatermarkIndex() {
+int index = -1;
+long timestamp = Long.MAX_VALUE;
+for (int i = 0; i < watermarks.length; i++) {
+  if (watermarks[i].getTimestamp() < timestamp) {
+index = i;
+timestamp = watermarks[i].getTimestamp();
+  }
+}
+return index;
+  }
+
+  @Override
+  public void trackAndEmitWatermarks(final int edgeIndex, final Watermark 
watermark) {
+if (edgeIndex == minWatermarkIndex) {
+  // update min watermark
+  final Watermark prevMinWatermark = watermarks[minWatermarkIndex];
+  watermarks[minWatermarkIndex] = watermark;
+   // find min watermark
+  minWatermarkIndex = findNextMinWatermarkIndex();
+  final Watermark minWatermark = watermarks[minWatermarkIndex];
+  assert minWatermark.getTimestamp() >= prevMinWatermark.getTimestamp();
 
 Review comment:
   In Nemo, `assert` doesn't work other than in unit tests.
   
   https://stackoverflow.com/questions/7865241/assertion-not-working


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229599612
 
 

 ##
 File path: 
runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 ##
 @@ -601,6 +600,91 @@ public void close() {
 }
   }
 
+  /**
+   * Source vertex for unbounded source test.
+   */
+  private final class TestUnboundedSourceVertex extends SourceVertex {
+
+@Override
+public boolean isBounded() {
+  return false;
+}
+
+@Override
+public List getReadables(int desiredNumOfSplits) throws 
Exception {
+  return null;
+}
+
+@Override
+public void clearInternalStates() {
+
+}
+
+@Override
+public IRVertex getClone() {
+  return null;
+}
+  }
+
+  private final class TestUnboundedSourceReadable implements Readable {
+int pointer = 0;
+final int middle = elements.size() / 2;
+final int end = elements.size();
+boolean watermarkEmitted = false;
+final List watermarks;
+int numEmittedWatermarks = 0;
+
+public TestUnboundedSourceReadable(final List watermarks) {
+  this.watermarks = watermarks;
+}
+
+@Override
+public void prepare() {
+
+}
+
+// This emulates unbounded source that throws NoSuchElementException
+// It reads current data until middle point and  throws 
NoSuchElementException at the middle point.
+// It resumes the data reading after emitting a watermark, and finishes at 
the end of the data.
+@Override
+public Object readCurrent() throws NoSuchElementException {
+  if (pointer == middle && !watermarkEmitted) {
 
 Review comment:
   Can you explain a little bit more in detail (in the comment) why `! 
watermarkEmitted` is needed here?
   `readCurrent` and watermark seem independent operations to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services