[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229599612
 
 

 ##
 File path: 
runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 ##
 @@ -601,6 +600,91 @@ public void close() {
 }
   }
 
+  /**
+   * Source vertex for unbounded source test.
+   */
+  private final class TestUnboundedSourceVertex extends SourceVertex {
+
+@Override
+public boolean isBounded() {
+  return false;
+}
+
+@Override
+public List getReadables(int desiredNumOfSplits) throws 
Exception {
+  return null;
+}
+
+@Override
+public void clearInternalStates() {
+
+}
+
+@Override
+public IRVertex getClone() {
+  return null;
+}
+  }
+
+  private final class TestUnboundedSourceReadable implements Readable {
+int pointer = 0;
+final int middle = elements.size() / 2;
+final int end = elements.size();
+boolean watermarkEmitted = false;
+final List watermarks;
+int numEmittedWatermarks = 0;
+
+public TestUnboundedSourceReadable(final List watermarks) {
+  this.watermarks = watermarks;
+}
+
+@Override
+public void prepare() {
+
+}
+
+// This emulates unbounded source that throws NoSuchElementException
+// It reads current data until middle point and  throws 
NoSuchElementException at the middle point.
+// It resumes the data reading after emitting a watermark, and finishes at 
the end of the data.
+@Override
+public Object readCurrent() throws NoSuchElementException {
+  if (pointer == middle && !watermarkEmitted) {
 
 Review comment:
   Can you explain a little bit more in detail (in the comment) why `! 
watermarkEmitted` is needed here?
   `readCurrent` and watermark seem independent operations to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229596058
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
 ##
 @@ -478,17 +511,24 @@ private boolean handleDataFetchers(final 
List fetchers) {
 return map;
   }
 
-  private Map> getInternalAdditionalOutputMap(
+  private Map> getInternalAdditionalOutputMap(
 
 Review comment:
   Maybe merge `getInternalAdditionalOutputMap` and `getInternalMainOutputs`, 
and remove the intermediate`edgeIndexMap`?
   
   Pair merged() {
 internal = new;
 additional = new;
   
 for (outedge) {
   if ... 
   then internal.add(index, ...) 
   else additional.add(index, ...)
 }
   }


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229590607
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+/**
+ * This tracks the minimum input watermark among multiple input streams.
+ */
+public final class MultiInputWatermarkManager implements InputWatermarkManager 
{
+  private final Watermark[] watermarks;
+  private final OperatorVertex nextOperator;
+  private int minWatermarkIndex;
+  public MultiInputWatermarkManager(final int numEdges,
+final OperatorVertex nextOperator) {
+super();
+this.watermarks = new Watermark[numEdges];
+this.nextOperator = nextOperator;
+this.minWatermarkIndex = 0;
+// We initialize watermarks as min value because
+// we should not emit watermark until all edges emit watermarks.
+for (int i = 0; i < numEdges; i++) {
+  watermarks[i] = new Watermark(Long.MIN_VALUE);
+}
+  }
+
+  private int findNextMinWatermarkIndex() {
+int index = -1;
+long timestamp = Long.MAX_VALUE;
+for (int i = 0; i < watermarks.length; i++) {
+  if (watermarks[i].getTimestamp() < timestamp) {
+index = i;
+timestamp = watermarks[i].getTimestamp();
+  }
+}
+return index;
+  }
+
+  @Override
+  public void trackAndEmitWatermarks(final int edgeIndex, final Watermark 
watermark) {
+if (edgeIndex == minWatermarkIndex) {
+  // update min watermark
+  final Watermark prevMinWatermark = watermarks[minWatermarkIndex];
+  watermarks[minWatermarkIndex] = watermark;
+   // find min watermark
+  minWatermarkIndex = findNextMinWatermarkIndex();
+  final Watermark minWatermark = watermarks[minWatermarkIndex];
+  assert minWatermark.getTimestamp() >= prevMinWatermark.getTimestamp();
 
 Review comment:
   In Nemo, `assert` doesn't work other than in unit tests.
   
   https://stackoverflow.com/questions/7865241/assertion-not-working


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229596747
 
 

 ##
 File path: 
runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.util.LinkedList;
+import java.util.List;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+
+public final class InputWatermarkManagerTest {
+
+  @Test
+  public void test() {
+final List emittedWatermarks = new LinkedList<>();
+final Transform transform = mock(Transform.class);
+doAnswer(new Answer() {
 
 Review comment:
   Answer with the `Void` type?
   
https://stackoverflow.com/questions/2276271/how-to-make-mock-to-void-methods-with-mockito
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229589941
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+/**
+ * This tracks the minimum input watermark among multiple input streams.
+ */
+public final class MultiInputWatermarkManager implements InputWatermarkManager 
{
+  private final Watermark[] watermarks;
 
 Review comment:
   Maybe use an ArrayList with the size initialized to `numEdges`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229598008
 
 

 ##
 File path: 
runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
 ##
 @@ -330,6 +261,74 @@ public void testParentTaskDataFetching() throws Exception 
{
 assertTrue(checkEqualElements(elements, 
runtimeEdgeToOutputData.get(taskOutEdge.getId(;
   }
 
+  /**
+   * The DAG of the task to test will looks like:
+   * source1 -> vertex1 -> vertex2
+   * source2 -> vertex3 ->
+   *
+   * The vertex2 has two incoming edges (from vertex1 and vertex3)
+   * and we test if TaskExecutor handles data and watermarks correctly in this 
situation.
+   *
+   * The source1 emits watermark 500, 1800 and source2 emits watermark 1000.
+   * The vertex2 should receive and emit watermarks 500 and 1000.
 
 Review comment:
   If watermarks come in this order
   source1: 500
   source1: 1800
   source2: 1000
   
   Shouldn't there be only a single emission of 1000?
   
   Can you comment on the assumptions made in this test, and how the 
assumptions are ensured?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #137: [NEMO-232] Implement InputWatermarkManager

2018-10-31 Thread GitBox
johnyangk commented on a change in pull request #137: [NEMO-232] Implement 
InputWatermarkManager
URL: https://github.com/apache/incubator-nemo/pull/137#discussion_r229596994
 
 

 ##
 File path: 
runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.util.LinkedList;
+import java.util.List;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+
+public final class InputWatermarkManagerTest {
+
+  @Test
+  public void test() {
+final List emittedWatermarks = new LinkedList<>();
 
 Review comment:
   Maybe a singleton Watermark initialized to -1?
   (Since the array size is always kept to 1 in the tests)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services