This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 4c06bd3 SAMZA-2293: Propagate the watermark future to StreamOperatorTask correctly 4c06bd3 is described below commit 4c06bd3a6b6ee1be2624270d5d1b015e83e0f6bf Author: mynameborat <bharath.kumarasubraman...@gmail.com> AuthorDate: Tue Aug 6 19:19:09 2019 -0700 SAMZA-2293: Propagate the watermark future to StreamOperatorTask correctly Author: mynameborat <bharath.kumarasubraman...@gmail.com> Reviewers: xinyuiscool <xi...@linkedin.com> Closes #1129 from mynameborat/async-watermark-propagation-fix --- .../src/main/java/org/apache/samza/operators/impl/OperatorImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 8d4ae21..3d32be3 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -191,7 +191,7 @@ public abstract class OperatorImpl<M, RM> { .toArray(CompletableFuture[]::new)); }); - result.thenAccept(x -> { + return result.thenAccept(x -> { WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn(); if (watermarkFn != null) { // check whether there is new watermark emitted from the user function @@ -199,8 +199,6 @@ public abstract class OperatorImpl<M, RM> { propagateWatermark(outputWm, collector, coordinator); } }); - - return result; } /** @@ -415,7 +413,7 @@ public abstract class OperatorImpl<M, RM> { .toArray(CompletableFuture[]::new)); } - watermarkFuture.thenCompose(res -> propagateWatermark(outputWm, collector, coordinator)); + watermarkFuture = watermarkFuture.thenCompose(res -> propagateWatermark(outputWm, collector, coordinator)); } return watermarkFuture;