[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-17 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617421#comment-16617421
 ] 

Piotr Nowojski commented on FLINK-10327:


I would be also in favour of *won't fixing* but maybe others have a different 
opinion?

> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-17 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617287#comment-16617287
 ] 

Kostas Kloudas commented on FLINK-10327:


[~pnowojski] as you mention, in terms of code,  the change may be just a couple 
of lines, but the implications of exposing such functionality at the function 
level are more than that.

As I said in the PR:

1) manipulating the watermark should be explicitly done from a WatermarkEmitter 
(which is exposed to the user).
2) if you want to run a "callback" upon watermark, then so far the trick is to 
register a timer for watermark + 1. This is even more "powerful" than just 
exposing the `processWatermark` to the processFunction, as by reacting to a 
timer, you are already in a keyed context (so you have access to state).
3) if you want to go even lower than that, then you should write your own 
operator, and be explicit on how to handle the different parts that constitute 
an operator, i.e. collectors, timestamps of emitted elements 
(collector.setTimestamp), etc.

If we agree on that, then I would recommend to close the issue as "won't fix".

> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-17 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617218#comment-16617218
 ] 

Piotr Nowojski commented on FLINK-10327:


[~kkl0u], my argument in favour of exposing {{processWatermark}} calls is that 
operators are public api and I don't understand the argument that we shouldn't 
expose them in functions, when they are already exposed and publicly available 
on the lower level. Especially if there is an easy way to do so. Except maybe 
of the issue, how to handle keyed state and what should happen on state 
accesses during {{processWatermark}} calls. Maybe this is a good argument why 
we shouldn't expose {{processWatermark}} to higher level functions?

> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617201#comment-16617201
 ] 

ASF GitHub Bot commented on FLINK-10327:


pnowojski closed pull request #6687: [FLINK-10327][streaming] Expose 
processWatermarks notifications to (Co)ProcessFunction
URL: https://github.com/apache/flink/pull/6687
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
index c2c130ef58d..faf8fc7943a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
@@ -84,6 +85,15 @@
 */
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {}
 
+   /**
+* Called when watermark has advanced.
+*
+* @param mark The {@link Watermark} that triggered this call
+* @param out The collector to emit resulting elements to
+*/
+   public void processWatermark(Watermark mark, Collector out) throws 
Exception {
+   }
+
/**
 * Information available in an invocation of {@link 
#processElement(Object, Context, Collector)}
 * or {@link #onTimer(long, OnTimerContext, Collector)}.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index 20c10840c2c..39a9a7d2cd5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
@@ -98,6 +99,39 @@
 */
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {}
 
+   /**
+* Called when combined watermark of both inputs has advanced.
+*
+* @param mark The {@link Watermark} that triggered this call
+* @param out The collector to emit resulting elements to
+*/
+   public void processWatermark(Watermark mark, Collector out) throws 
Exception {
+   }
+
+   /**
+* Called when watermark of the first input has advanced. If this 
update will trigger an update
+* of the combined watermark, this call will be followed by {@link 
#processWatermark(Watermark, Collector)}
+* call.
+*
+* @param mark The {@link Watermark} that triggered this call
+* @param out The collector to emit resulting elements to. Results 
emitted will have a timestamp
+*set to the value before advancing combined watermark.
+*/
+   public void processWatermark1(Watermark mark, Collector out) 
throws Exception {
+   }
+
+   /**
+* Called when watermark of the second input has advanced. If this 
update will trigger an update
+* of the combined watermark, this call will be followed by {@link 
#processWatermark(Watermark, Collector)}
+* call.
+*
+* @param mark The {@link Watermark} that triggered this call
+* @param out The collector to emit resulting elements to. Results 
emitted will have a timestamp
+*set to the value before advancing combined watermark.
+*/
+   public void processWatermark2(Watermark mark, Collector out) 
throws Exception {
+   }
+
/**
 * Information available in an invocation of {@link 
#processElement1(Object, Context, Collector)}/
 * {@link #processElement2(Object, Context, Collector)}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
 

[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617200#comment-16617200
 ] 

ASF GitHub Bot commented on FLINK-10327:


pnowojski commented on issue #6687: [FLINK-10327][streaming] Expose 
processWatermarks notifications to (Co)ProcessFunction
URL: https://github.com/apache/flink/pull/6687#issuecomment-421924359
 
 
   I'm closing this PR since I have realised that it's incomplete (missing 
implementation in other types of `ProcessFunction`s). @kl0u lets move this 
discussion to Jira.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613537#comment-16613537
 ] 

ASF GitHub Bot commented on FLINK-10327:


kl0u commented on issue #6687: [FLINK-10327][streaming] Expose 
processWatermarks notifications to (Co)ProcessFunction
URL: https://github.com/apache/flink/pull/6687#issuecomment-421017411
 
 
   Hi @pnowojski ! 
   
   I can understand that this can be an interesting addition for some usecases, 
but it is a big one, and it should be discussed more thoroughly and, most 
importantly, more publicly. I would be against merging it as just a sub-commit 
of another feature.
   
   The reason is that this allows users to "play" with watermarks from the 
level of a `Function` and not `Operator`, which was, intentionally, the case so 
far.
   
   If you want to "hold back" the watermark, the this should be done by a 
watermark assigner.
   
   If you want to run a "callback" upon watermark, then so far the trick is to 
register a timer for `watermark + 1`. 
   
   I can find usecases which do not fall into any of the above, but for those 
so far we implement custom operators. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612390#comment-16612390
 ] 

ASF GitHub Bot commented on FLINK-10327:


pnowojski opened a new pull request #6687: [FLINK-10327][streaming] Expose 
processWatermarks notifications to (Co)ProcessFunction
URL: https://github.com/apache/flink/pull/6687
 
 
   This PR exposes hooks for `processWatermark`, `processWatermark1` and 
`processWatermark2` to `ProcessFunction` and `CoProcessFunction`.
   
   ## Verifying this change
   
   Added new tests to `ProcessOperatorTest` and `CoProcessOperatorTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Shimin Yang
>Priority: Major
>  Labels: pull-request-available
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-12 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612388#comment-16612388
 ] 

Piotr Nowojski commented on FLINK-10327:


[~dangdangdang] ops sorry I forgot to assign myself to this one after creating. 
I have already a working code and I'm in the process of creating a PR for it  :(

> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Shimin Yang
>Priority: Major
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10327) Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction

2018-09-12 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612278#comment-16612278
 ] 

Shimin Yang commented on FLINK-10327:
-

Sounds good, I ll work on it.

> Pass processWatermark calls from (Co)ProcessOperator to (Co)ProcessFunction
> ---
>
> Key: FLINK-10327
> URL: https://issues.apache.org/jira/browse/FLINK-10327
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Piotr Nowojski
>Assignee: Shimin Yang
>Priority: Major
>
> Currently {{CoProcessFunction}} can not react to changes watermark 
> advancement. By passing {{processWatermark}} calls to function we would give 
> a way to perform some actions on watermark advancement, like state clean up 
> or emitting some results after accumulating some data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)