[GitHub] taegeonum commented on a change in pull request #153: [NEMO-245, 247] Handle watermark in OutputWriter and Implement unbounded word count example
taegeonum commented on a change in pull request #153: [NEMO-245,247] Handle watermark in OutputWriter and Implement unbounded word count example URL: https://github.com/apache/incubator-nemo/pull/153#discussion_r232606765 ## File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java ## @@ -96,14 +100,37 @@ private void fetchDataLazily() { final List> futures = readersForParentTask.read(); numOfIterators = futures.size(); +if (numOfIterators > 1) { + inputWatermarkManager = new MultiInputWatermarkManager(numOfIterators, new WatermarkCollector()); +} else { + inputWatermarkManager = new SingleInputWatermarkManager(new WatermarkCollector()); +} + futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> { // A thread for each iterator queueInsertionThreads.submit(() -> { if (exception == null) { // Consume this iterator to the end. while (iterator.hasNext()) { // blocked on the iterator. final Object element = iterator.next(); -elementQueue.offer(element); + +if (LOG.isDebugEnabled()) { + LOG.debug("Receive data : {}", element); +} + +if (element instanceof WatermarkWithIndex) { + // watermark element + // the input watermark manager is accessed by multiple threads + // so we should synchronize it + synchronized (inputWatermarkManager) { +final WatermarkWithIndex watermarkWithIndex = (WatermarkWithIndex) element; +inputWatermarkManager.trackAndEmitWatermarks( Review comment: will add the comment to `InputWatermarkManager` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on a change in pull request #153: [NEMO-245, 247] Handle watermark in OutputWriter and Implement unbounded word count example
taegeonum commented on a change in pull request #153: [NEMO-245,247] Handle watermark in OutputWriter and Implement unbounded word count example URL: https://github.com/apache/incubator-nemo/pull/153#discussion_r232605065 ## File path: common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java ## @@ -84,7 +88,7 @@ private BytesDecoder(final InputStream inputStream) { returnedArray = true; return new byte[0]; } else { - throw new IOException("EoF (empty partition)!"); // TODO #120: use EOF exception instead of IOException. + throw new EOFException("EoF (empty partition)!"); // TODO #120: use EOF exception instead of IOException. Review comment: I'm not sure it resolves the issue. @seojangho Do you think it resolves issue 120? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on a change in pull request #153: [NEMO-245, 247] Handle watermark in OutputWriter and Implement unbounded word count example
taegeonum commented on a change in pull request #153: [NEMO-245,247] Handle watermark in OutputWriter and Implement unbounded word count example URL: https://github.com/apache/incubator-nemo/pull/153#discussion_r232482115 ## File path: examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java ## @@ -41,19 +43,64 @@ private WindowedWordCount() { } + public static final String INPUT_TYPE_BOUNDED = "bounded"; + public static final String INPUT_TYPE_UNBOUNDED = "unbounded"; + + + private static PCollection> getSource( +final Pipeline p, +final String[] args) { + +final String inputType = args[2]; +if (inputType.compareTo(INPUT_TYPE_BOUNDED) == 0) { + final String inputFilePath = args[3]; + return GenericSourceSink.read(p, inputFilePath) +.apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(@Element final String elem, + final OutputReceiver out) { +final String[] splitt = elem.split("!"); +out.outputWithTimestamp(splitt[0], new Instant(Long.valueOf(splitt[1]))); + } +})) +.apply(MapElements.>via(new SimpleFunction>() { + @Override + public KV apply(final String line) { +final String[] words = line.split(" +"); +final String documentId = words[0] + "#" + words[1]; +final Long count = Long.parseLong(words[2]); +return KV.of(documentId, count); + } +})); +} else if (inputType.compareTo(INPUT_TYPE_UNBOUNDED) == 0) { + // unbounded + return p.apply(GenerateSequence +.from(1) +.withRate(2, Duration.standardSeconds(1)) Review comment: Sorry, I don't fully understand your question. Why do we set the number high? emitting watermarks is not related to the data rate. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] taegeonum commented on a change in pull request #153: [NEMO-245, 247] Handle watermark in OutputWriter and Implement unbounded word count example
taegeonum commented on a change in pull request #153: [NEMO-245,247] Handle watermark in OutputWriter and Implement unbounded word count example URL: https://github.com/apache/incubator-nemo/pull/153#discussion_r232482013 ## File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java ## @@ -74,7 +84,10 @@ public void trackAndEmitWatermarks(final int edgeIndex, final Watermark watermar if (minWatermark.getTimestamp() > prevMinWatermark.getTimestamp()) { // Watermark timestamp progress! // Emit the min watermark -nextOperator.getTransform().onWatermark(minWatermark); +if (LOG.isDebugEnabled()) { + LOG.debug("Emit watermark {}, {}", minWatermark, watermarks); Review comment: watermark is generated with low rate (e.g., 1 sec), so I think it is ok This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services