[GitHub] taegeonum commented on a change in pull request #153: [NEMO-245, 247] Handle watermark in OutputWriter and Implement unbounded word count example

2018-11-12 Thread GitBox
taegeonum commented on a change in pull request #153: [NEMO-245,247] Handle 
watermark in OutputWriter and Implement unbounded word count example
URL: https://github.com/apache/incubator-nemo/pull/153#discussion_r232606765
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
 ##
 @@ -96,14 +100,37 @@ private void fetchDataLazily() {
 final List> futures = 
readersForParentTask.read();
 numOfIterators = futures.size();
 
+if (numOfIterators > 1) {
+  inputWatermarkManager = new MultiInputWatermarkManager(numOfIterators, 
new WatermarkCollector());
+} else {
+  inputWatermarkManager = new SingleInputWatermarkManager(new 
WatermarkCollector());
+}
+
 futures.forEach(compFuture -> compFuture.whenComplete((iterator, 
exception) -> {
   // A thread for each iterator
   queueInsertionThreads.submit(() -> {
 if (exception == null) {
   // Consume this iterator to the end.
   while (iterator.hasNext()) { // blocked on the iterator.
 final Object element = iterator.next();
-elementQueue.offer(element);
+
+if (LOG.isDebugEnabled()) {
+  LOG.debug("Receive data : {}", element);
+}
+
+if (element instanceof WatermarkWithIndex) {
+  // watermark element
+  // the input watermark manager is accessed by multiple threads
+  // so we should synchronize it
+  synchronized (inputWatermarkManager) {
+final WatermarkWithIndex watermarkWithIndex = 
(WatermarkWithIndex) element;
+inputWatermarkManager.trackAndEmitWatermarks(
 
 Review comment:
   will add the comment to `InputWatermarkManager`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on a change in pull request #153: [NEMO-245, 247] Handle watermark in OutputWriter and Implement unbounded word count example

2018-11-12 Thread GitBox
taegeonum commented on a change in pull request #153: [NEMO-245,247] Handle 
watermark in OutputWriter and Implement unbounded word count example
URL: https://github.com/apache/incubator-nemo/pull/153#discussion_r232605065
 
 

 ##
 File path: 
common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
 ##
 @@ -84,7 +88,7 @@ private BytesDecoder(final InputStream inputStream) {
   returnedArray = true;
   return new byte[0];
 } else {
-  throw new IOException("EoF (empty partition)!"); // TODO #120: use 
EOF exception instead of IOException.
+  throw new EOFException("EoF (empty partition)!"); // TODO #120: use 
EOF exception instead of IOException.
 
 Review comment:
   I'm not sure it resolves the issue. @seojangho Do you think it resolves 
issue 120?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on a change in pull request #153: [NEMO-245, 247] Handle watermark in OutputWriter and Implement unbounded word count example

2018-11-11 Thread GitBox
taegeonum commented on a change in pull request #153: [NEMO-245,247] Handle 
watermark in OutputWriter and Implement unbounded word count example
URL: https://github.com/apache/incubator-nemo/pull/153#discussion_r232482115
 
 

 ##
 File path: 
examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
 ##
 @@ -41,19 +43,64 @@
   private WindowedWordCount() {
   }
 
+  public static final String INPUT_TYPE_BOUNDED = "bounded";
+  public static final String INPUT_TYPE_UNBOUNDED = "unbounded";
+
+
+  private static PCollection> getSource(
+final Pipeline p,
+final String[] args) {
+
+final String inputType = args[2];
+if (inputType.compareTo(INPUT_TYPE_BOUNDED) == 0) {
+  final String inputFilePath = args[3];
+  return GenericSourceSink.read(p, inputFilePath)
+.apply(ParDo.of(new DoFn() {
+  @ProcessElement
+  public void processElement(@Element final String elem,
+ final OutputReceiver out) {
+final String[] splitt = elem.split("!");
+out.outputWithTimestamp(splitt[0], new 
Instant(Long.valueOf(splitt[1])));
+  }
+}))
+.apply(MapElements.>via(new 
SimpleFunction>() {
+  @Override
+  public KV apply(final String line) {
+final String[] words = line.split(" +");
+final String documentId = words[0] + "#" + words[1];
+final Long count = Long.parseLong(words[2]);
+return KV.of(documentId, count);
+  }
+}));
+} else if (inputType.compareTo(INPUT_TYPE_UNBOUNDED) == 0) {
+  // unbounded
+  return p.apply(GenerateSequence
+.from(1)
+.withRate(2, Duration.standardSeconds(1))
 
 Review comment:
   Sorry, I don't fully understand your question. Why do we set the number 
high? emitting watermarks  is not related to the data rate.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] taegeonum commented on a change in pull request #153: [NEMO-245, 247] Handle watermark in OutputWriter and Implement unbounded word count example

2018-11-11 Thread GitBox
taegeonum commented on a change in pull request #153: [NEMO-245,247] Handle 
watermark in OutputWriter and Implement unbounded word count example
URL: https://github.com/apache/incubator-nemo/pull/153#discussion_r232482013
 
 

 ##
 File path: 
runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
 ##
 @@ -74,7 +84,10 @@ public void trackAndEmitWatermarks(final int edgeIndex, 
final Watermark watermar
   if (minWatermark.getTimestamp() > prevMinWatermark.getTimestamp()) {
 // Watermark timestamp progress!
 // Emit the min watermark
-nextOperator.getTransform().onWatermark(minWatermark);
+if (LOG.isDebugEnabled()) {
+  LOG.debug("Emit watermark {}, {}", minWatermark, watermarks);
 
 Review comment:
   watermark is generated with low rate (e.g., 1 sec), so I think it is ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services