[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617421#comment-16617421 ] Piotr Nowojski commented on FLINK-10327: I would be also in favour of *won't fixing* but maybe others have a different opinion? > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617287#comment-16617287 ] Kostas Kloudas commented on FLINK-10327: [~pnowojski] as you mention, in terms of code, the change may be just a couple of lines, but the implications of exposing such functionality at the function level are more than that. As I said in the PR: 1) manipulating the watermark should be explicitly done from a WatermarkEmitter (which is exposed to the user). 2) if you want to run a "callback" upon watermark, then so far the trick is to register a timer for watermark + 1. This is even more "powerful" than just exposing the `processWatermark` to the processFunction, as by reacting to a timer, you are already in a keyed context (so you have access to state). 3) if you want to go even lower than that, then you should write your own operator, and be explicit on how to handle the different parts that constitute an operator, i.e. collectors, timestamps of emitted elements (collector.setTimestamp), etc. If we agree on that, then I would recommend to close the issue as "won't fix". > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617218#comment-16617218 ] Piotr Nowojski commented on FLINK-10327: [~kkl0u], my argument in favour of exposing {{processWatermark}} calls is that operators are public api and I don't understand the argument that we shouldn't expose them in functions, when they are already exposed and publicly available on the lower level. Especially if there is an easy way to do so. Except maybe of the issue, how to handle keyed state and what should happen on state accesses during {{processWatermark}} calls. Maybe this is a good argument why we shouldn't expose {{processWatermark}} to higher level functions? > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617201#comment-16617201 ] ASF GitHub Bot commented on FLINK-10327: pnowojski closed pull request #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction URL: https://github.com/apache/flink/pull/6687 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java index c2c130ef58d..faf8fc7943a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @@ -84,6 +85,15 @@ */ public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {} + /** +* Called when watermark has advanced. +* +* @param mark The {@link Watermark} that triggered this call +* @param out The collector to emit resulting elements to +*/ + public void processWatermark(Watermark mark, Collector out) throws Exception { + } + /** * Information available in an invocation of {@link #processElement(Object, Context, Collector)} * or {@link #onTimer(long, OnTimerContext, Collector)}. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java index 20c10840c2c..39a9a7d2cd5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; @@ -98,6 +99,39 @@ */ public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {} + /** +* Called when combined watermark of both inputs has advanced. +* +* @param mark The {@link Watermark} that triggered this call +* @param out The collector to emit resulting elements to +*/ + public void processWatermark(Watermark mark, Collector out) throws Exception { + } + + /** +* Called when watermark of the first input has advanced. If this update will trigger an update +* of the combined watermark, this call will be followed by {@link #processWatermark(Watermark, Collector)} +* call. +* +* @param mark The {@link Watermark} that triggered this call +* @param out The collector to emit resulting elements to. Results emitted will have a timestamp +*set to the value before advancing combined watermark. +*/ + public void processWatermark1(Watermark mark, Collector out) throws Exception { + } + + /** +* Called when watermark of the second input has advanced. If this update will trigger an update +* of the combined watermark, this call will be followed by {@link #processWatermark(Watermark, Collector)} +* call. +* +* @param mark The {@link Watermark} that triggered this call +* @param out The collector to emit resulting elements to. Results emitted will have a timestamp +*set to the value before advancing combined watermark. +*/ + public void processWatermark2(Watermark mark, Collector out) throws Exception { + } + /** * Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/ * {@link #processElement2(Object, Context, Collector)} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617200#comment-16617200 ] ASF GitHub Bot commented on FLINK-10327: pnowojski commented on issue #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction URL: https://github.com/apache/flink/pull/6687#issuecomment-421924359 I'm closing this PR since I have realised that it's incomplete (missing implementation in other types of `ProcessFunction`s). @kl0u lets move this discussion to Jira. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613537#comment-16613537 ] ASF GitHub Bot commented on FLINK-10327: kl0u commented on issue #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction URL: https://github.com/apache/flink/pull/6687#issuecomment-421017411 Hi @pnowojski ! I can understand that this can be an interesting addition for some usecases, but it is a big one, and it should be discussed more thoroughly and, most importantly, more publicly. I would be against merging it as just a sub-commit of another feature. The reason is that this allows users to "play" with watermarks from the level of a `Function` and not `Operator`, which was, intentionally, the case so far. If you want to "hold back" the watermark, the this should be done by a watermark assigner. If you want to run a "callback" upon watermark, then so far the trick is to register a timer for `watermark + 1`. I can find usecases which do not fall into any of the above, but for those so far we implement custom operators. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612390#comment-16612390 ] ASF GitHub Bot commented on FLINK-10327: pnowojski opened a new pull request #6687: [FLINK-10327][streaming] Expose processWatermarks notifications to (Co)ProcessFunction URL: https://github.com/apache/flink/pull/6687 This PR exposes hooks for `processWatermark`, `processWatermark1` and `processWatermark2` to `ProcessFunction` and `CoProcessFunction`. ## Verifying this change Added new tests to `ProcessOperatorTest` and `CoProcessOperatorTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Shimin Yang >Priority: Major > Labels: pull-request-available > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612388#comment-16612388 ] Piotr Nowojski commented on FLINK-10327: [~dangdangdang] ops sorry I forgot to assign myself to this one after creating. I have already a working code and I'm in the process of creating a PR for it :( > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Shimin Yang >Priority: Major > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612278#comment-16612278 ] Shimin Yang commented on FLINK-10327: - Sounds good, I ll work on it. > Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction > --- > > Key: FLINK-10327 > URL: https://issues.apache.org/jira/browse/FLINK-10327 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Piotr Nowojski >Assignee: Shimin Yang >Priority: Major > > Currently {{CoProcessFunction}} can not react to changes watermark > advancement. By passing {{processWatermark}} calls to function we would give > a way to perform some actions on watermark advancement, like state clean up > or emitting some results after accumulating some data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)