[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388031#comment-16388031 ] ASF GitHub Bot commented on FLINK-8560: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5481 > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387512#comment-16387512 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5481 Thanks @bowenli86 ! I will merge as soon as Travis gives green. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387304#comment-16387304 ] ASF GitHub Bot commented on FLINK-8560: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5481 @kl0u @aljoscha I added the scala example, and I believe the only build failure in Travis is irrelevant > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385880#comment-16385880 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r172136142 --- Diff: docs/dev/stream/operators/process_function.md --- @@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. \ No newline at end of file +that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. + +## The KeyedProcessFunction + +`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)` +method. + +{% highlight java %} --- End diff -- @bowenli86 As soon as the scala example is added, I can take care of the other two comments and merge! Let me know when you update the PR, and thanks for the work! > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385877#comment-16385877 ] ASF GitHub Bot commented on FLINK-8560: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r172133573 --- Diff: docs/dev/stream/operators/process_function.md --- @@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. \ No newline at end of file +that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. + +## The KeyedProcessFunction + +`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)` +method. + +{% highlight java %} +@Override +public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { --- End diff -- I believe this is now `public void onTimer(long timestamp, OnTimerContext ctx, Collector out)`, right? @kl0u you could fix this while merging. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385876#comment-16385876 ] ASF GitHub Bot commented on FLINK-8560: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r172135390 --- Diff: docs/dev/stream/operators/process_function.md --- @@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs -that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. \ No newline at end of file +that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic. + +## The KeyedProcessFunction + +`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access to the key of timers in its `onTimer(...)` +method. + +{% highlight java %} --- End diff -- Maybe also add Scala example code. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385875#comment-16385875 ] ASF GitHub Bot commented on FLINK-8560: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r172135104 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java --- @@ -70,21 +69,15 @@ public void open() throws Exception { @Override public void onEventTime(InternalTimertimer) throws Exception { collector.setAbsoluteTimestamp(timer.getTimestamp()); - onTimerContext.timeDomain = TimeDomain.EVENT_TIME; - onTimerContext.timer = timer; - userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); - onTimerContext.timeDomain = null; - onTimerContext.timer = null; + reinitialize(userFunction, TimeDomain.EVENT_TIME, timer); --- End diff -- Hate to be picky, but I think the name is a bit misleading and we could probably put all of this in a method `invokeUserTime()` that does what `reinitialise()` and `reset()` do. @kl0u I think you can quickly fix that when merging. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383180#comment-16383180 ] ASF GitHub Bot commented on FLINK-8560: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5481 @kl0u I added the comments for `@deprecated` in the javadoc. Let me know if you can merge the two related PRs. Thanks > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16382032#comment-16382032 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171566773 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -66,9 +66,9 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * function, this function can also query the time and set timers. When reacting to the firing * of set timers the function can directly emit elements and/or register yet more timers. * -* @param processFunction The [[ProcessFunction]] that is called for each element -* in the stream. +* @param processFunction The [[ProcessFunction]] that is called for each element in the stream. --- End diff -- Please also add that the user now should use the new `KeyedProcessFunction` instead. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380275#comment-16380275 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171233273 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -84,18 +86,18 @@ * elements that have the same key. * * @param The type of the elements in the Keyed Stream. - * @param The type of the key in the Keyed Stream. + * @param The type of the key in the Keyed Stream. --- End diff -- Revert the renaming from `KEY` to `K`. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380276#comment-16380276 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171237136 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } - + + /** + * Applies the given [[KeyedProcessFunction]] on the input stream, thereby + * creating a transformed output stream. + * + * The function will be called for every element in the stream and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. + * + * @param keyedProcessFunction The [[KeyedProcessFunction]] that is called for each element + * in the stream. + */ + @PublicEvolving + def process[K, R: TypeInformation]( +keyedProcessFunction: KeyedProcessFunction[K, T, R]): DataStream[R] = { + +if (keyedProcessFunction == null) { + throw new NullPointerException("ProcessFunction must not be null.") +} --- End diff -- The message now should be `"KeyedProcessFunction must not be null."` > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380279#comment-16380279 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171234582 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation type) { * @param The type of elements emitted by the {@code ProcessFunction}. * * @return The transformed {@link DataStream}. +* +* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)} */ + @Deprecated @Override @Internal public SingleOutputStreamOperator process( ProcessFunctionprocessFunction, TypeInformation outputType) { - KeyedProcessOperator operator = - new KeyedProcessOperator<>(clean(processFunction)); + LegacyKeyedProcessOperator operator = new LegacyKeyedProcessOperator<>(clean(processFunction)); return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} +* function, this function can also query the time and set timers. When reacting to the firing +* of set timers the function can directly emit elements and/or register yet more timers. +* +* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) { + + TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( + keyedProcessFunction, + KeyedProcessFunction.class, + 0, + 1, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + getType(), + Utils.getCallLocationName(), + true); + + return process(keyedProcessFunction, outType); + } + + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} +* function, this function can also query the time and set timers. When reacting to the firing +* of set timers the function can directly emit elements and/or register yet more timers. +* +* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream. +* +* @param outputType {@link TypeInformation} for the result type of the function. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @Internal + public SingleOutputStreamOperator process( + KeyedProcessFunction keyedProcessFunction, --- End diff -- Same here, you do not have to redefine the type of the key. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement >
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380273#comment-16380273 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171234411 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation type) { * @param The type of elements emitted by the {@code ProcessFunction}. * * @return The transformed {@link DataStream}. +* +* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)} */ + @Deprecated @Override @Internal public SingleOutputStreamOperator process( ProcessFunctionprocessFunction, TypeInformation outputType) { - KeyedProcessOperator operator = - new KeyedProcessOperator<>(clean(processFunction)); + LegacyKeyedProcessOperator operator = new LegacyKeyedProcessOperator<>(clean(processFunction)); return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} +* function, this function can also query the time and set timers. When reacting to the firing +* of set timers the function can directly emit elements and/or register yet more timers. +* +* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) { + + TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( + keyedProcessFunction, + KeyedProcessFunction.class, + 0, --- End diff -- The indices here are not `0` and `1` for input and output type, but `1` and `2`. In the process function it was 0 and 1 because we did not have the key. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380278#comment-16380278 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171237037 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } - + + /** + * Applies the given [[KeyedProcessFunction]] on the input stream, thereby + * creating a transformed output stream. + * + * The function will be called for every element in the stream and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. + * + * @param keyedProcessFunction The [[KeyedProcessFunction]] that is called for each element + * in the stream. + */ + @PublicEvolving + def process[K, R: TypeInformation]( --- End diff -- As in java, you do not need to redefine the `K` here. So you can remove it `def process[R: TypeInformation](`... > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380280#comment-16380280 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171235653 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java --- @@ -132,15 +139,15 @@ public TimerService timerService() { } } - private class OnTimerContextImpl extends ProcessFunction.OnTimerContext{ + private class OnTimerContextImpl extends KeyedProcessFunction .OnTimerContext { --- End diff -- With the proposed changes you can also remove the `` in the `.OnTimerContext`. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380274#comment-16380274 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171235356 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +/** + * A keyed function that processes elements of a stream. + * + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. + * + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code KeyedProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}. + * + * @param Type of the key. + * @param Type of the input elements. + * @param Type of the output elements. + */ +@PublicEvolving +public abstract class KeyedProcessFunctionextends AbstractRichFunction { + + private static final long serialVersionUID = 1L; + + /** +* Process one element from the input stream. +* +* This function can output zero or more elements using the {@link Collector} parameter +* and also update internal state or set timers using the {@link Context} parameter. +* +* @param value The input value. +* @param ctx A {@link Context} that allows querying the timestamp of the element and getting +*a {@link TimerService} for registering timers and querying the time. The +*context is only valid during the invocation of this method, do not store it. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + public abstract void processElement(I value, Context ctx, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link TimeDomain}, and the key +*of the firing timer and getting a {@link TimerService} for registering timers and querying the time. +*The context is only valid during the invocation of this method, do not store it. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions.
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380272#comment-16380272 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171233924 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation type) { * @param The type of elements emitted by the {@code ProcessFunction}. * * @return The transformed {@link DataStream}. +* +* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)} */ + @Deprecated @Override @Internal public SingleOutputStreamOperator process( ProcessFunctionprocessFunction, TypeInformation outputType) { - KeyedProcessOperator operator = - new KeyedProcessOperator<>(clean(processFunction)); + LegacyKeyedProcessOperator operator = new LegacyKeyedProcessOperator<>(clean(processFunction)); return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} +* function, this function can also query the time and set timers. When reacting to the firing +* of set timers the function can directly emit elements and/or register yet more timers. +* +* @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + public SingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) { + --- End diff -- The signature should be: ```public SingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) {``` You do not have to re-define the type of the key (the ` ` in the beginning) as we are already in a `KeyedStream` with an already defined type of key. Also remove the corresponding part in the `javadoc`. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380281#comment-16380281 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171235118 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +/** + * A keyed function that processes elements of a stream. + * + * For every element in the input stream {@link #processElement(Object, Context, Collector)} + * is invoked. This can produce zero or more elements as output. Implementations can also + * query the time and set timers through the provided {@link Context}. For firing timers + * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce + * zero or more elements as output and register further timers. + * + * NOTE: Access to keyed state and timers (which are also scoped to a key) is only + * available if the {@code KeyedProcessFunction} is applied on a {@code KeyedStream}. + * + * NOTE: A {@code KeyedProcessFunction} is always a + * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and + * teardown methods can be implemented. See + * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} + * and {@link org.apache.flink.api.common.functions.RichFunction#close()}. + * + * @param Type of the key. + * @param Type of the input elements. + * @param Type of the output elements. + */ +@PublicEvolving +public abstract class KeyedProcessFunctionextends AbstractRichFunction { + + private static final long serialVersionUID = 1L; + + /** +* Process one element from the input stream. +* +* This function can output zero or more elements using the {@link Collector} parameter +* and also update internal state or set timers using the {@link Context} parameter. +* +* @param value The input value. +* @param ctx A {@link Context} that allows querying the timestamp of the element and getting +*a {@link TimerService} for registering timers and querying the time. The +*context is only valid during the invocation of this method, do not store it. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation +* to fail and may trigger recovery. +*/ + public abstract void processElement(I value, Context ctx, Collector out) throws Exception; + + /** +* Called when a timer set using {@link TimerService} fires. +* +* @param timestamp The timestamp of the firing timer. +* @param ctx An {@link OnTimerContext} that allows querying the timestamp, the {@link TimeDomain}, and the key +*of the firing timer and getting a {@link TimerService} for registering timers and querying the time. +*The context is only valid during the invocation of this method, do not store it. +* @param out The collector for returning result values. +* +* @throws Exception This method may throw exceptions.
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380277#comment-16380277 ] ASF GitHub Bot commented on FLINK-8560: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r171236298 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -54,21 +54,20 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] // /** -* Applies the given [[ProcessFunction]] on the input stream, thereby -* creating a transformed output stream. -* -* The function will be called for every element in the stream and can produce -* zero or more output. The function can also query the time and set timers. When -* reacting to the firing of set timers the function can emit yet more elements. -* -* The function will be called for every element in the input streams and can produce zero -* or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] -* function, this function can also query the time and set timers. When reacting to the firing -* of set timers the function can directly emit elements and/or register yet more timers. -* -* @param processFunction The [[ProcessFunction]] that is called for each element -* in the stream. -*/ + * Applies the given [[ProcessFunction]] on the input stream, thereby + * creating a transformed output stream. + * + * The function will be called for every element in the stream and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. + * + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. + * + * @param processFunction The [[ProcessFunction]] that is called for each element in the stream. + */ --- End diff -- Revert all reformatings (indent by 1 space) and also add the `@deprecated` annotation with the correct, non-deprecated alternative, as done in the corresponding `java` class. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372176#comment-16372176 ] ASF GitHub Bot commented on FLINK-8560: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5481 Thanks for the review and suggestions. And your comment on `DataStream#process(KeyedProcessFunction)` makes sense, I've removed it. (btw, I feel https://github.com/apache/flink/pull/5500 is more urgent that this PR. Can you take it look at that one?) > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371135#comment-16371135 ] ASF GitHub Bot commented on FLINK-8560: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571436 --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala --- @@ -448,11 +447,35 @@ class DataStreamTest extends AbstractTestBase { val flatMapped = src.keyBy(x => x).process(processFunction) assert(processFunction == getFunctionForDataStream(flatMapped)) + assert(getOperatorForDataStream(flatMapped).isInstanceOf[LegacyKeyedProcessOperator[_, _, _]]) + } + + /** +* Verify that a [[KeyedStream.process(KeyedProcessFunction)]] call is correctly --- End diff -- ditto > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371136#comment-16371136 ] ASF GitHub Bot commented on FLINK-8560: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169573342 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java --- @@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() { return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param keyedProcessFunction The {@link ProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code KeyedProcessFunction}. +* +* @return The transformed {@link DataStream}. +*/ + @PublicEvolving + publicSingleOutputStreamOperator process(KeyedProcessFunction keyedProcessFunction) { --- End diff -- Does it make sense to add `process` with a `KeyedProcessFunction` on non keyed `DataStream`? > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371132#comment-16371132 ] ASF GitHub Bot commented on FLINK-8560: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571197 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala --- @@ -686,6 +686,27 @@ class DataStream[T](stream: JavaStream[T]) { asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } + /** +* Applies the given [[KeyedProcessFunction]] on the input stream, thereby --- End diff -- ditto > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371137#comment-16371137 ] ASF GitHub Bot commented on FLINK-8560: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571398 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala --- @@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } - + + /** +* Applies the given [[KeyedProcessFunction]] on the input stream, thereby --- End diff -- ditto > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371134#comment-16371134 ] ASF GitHub Bot commented on FLINK-8560: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571457 --- Diff: flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala --- @@ -473,6 +496,28 @@ class DataStreamTest extends AbstractTestBase { assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _]]) } + /** +* Verify that a [[DataStream.process(KeyedProcessFunction)]] call is correctly --- End diff -- ditto > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371133#comment-16371133 ] ASF GitHub Bot commented on FLINK-8560: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169571208 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala --- @@ -666,18 +666,18 @@ class DataStream[T](stream: JavaStream[T]) { } /** - * Applies the given [[ProcessFunction]] on the input stream, thereby - * creating a transformed output stream. - * - * The function will be called for every element in the stream and can produce - * zero or more output. - * - * @param processFunction The [[ProcessFunction]] that is called for each element - * in the stream. - */ +* Applies the given [[ProcessFunction]] on the input stream, thereby --- End diff -- Can you revert this formatting? I think proper java docs should be as this was before: ``` /** * */ ``` instead of: ``` /** * */ ``` > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370944#comment-16370944 ] ASF GitHub Bot commented on FLINK-8560: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5481 cc @pnowojski @aljoscha > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369124#comment-16369124 ] ASF GitHub Bot commented on FLINK-8560: --- Github user juergenthomann commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169077241 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java --- @@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() { return transform("Process", outputType, operator); } + /** +* Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating a transformed output stream. +* +* The function will be called for every element in the input streams and can produce zero +* or more output elements. +* +* @param keyedProcessFunction The {@link ProcessFunction} that is called for each element in the stream. +* +* @param The type of key in {@code KeyedProcessFunction}. +* +* @param The type of elements emitted by the {@code PKeyedProcessFunction}. --- End diff -- Could this be a typo with **P**KeyedProcessFunction instead of KeyedProcessFunction? > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369066#comment-16369066 ] ASF GitHub Bot commented on FLINK-8560: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169060125 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -377,119 +375,114 @@ public T getKey(T value) throws Exception { } } - private static class QueryingFlatMapFunction extends ProcessFunction{ + private static class QueryingFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; public QueryingFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; + this.expectedTimeDomain = timeDomain; } @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { - if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) { out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } else { out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); } } @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { + // Do nothing } } - private static class TriggeringFlatMapFunction extends ProcessFunction { + private static class TriggeringFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; + + static final Integer EXPECTED_KEY = 17; public TriggeringFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; + this.expectedTimeDomain = timeDomain; } @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect(value); - if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) { ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); } else { ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); } } @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { - - assertEquals(this.timeDomain, ctx.timeDomain()); + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { + assertEquals(EXPECTED_KEY, ctx.getCurrentKey()); + assertEquals(expectedTimeDomain, ctx.timeDomain()); out.collect(1777); } } - private static class TriggeringStatefulFlatMapFunction extends ProcessFunction { + private static class TriggeringStatefulFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; private final ValueStateDescriptor state = new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE); - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; +
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369065#comment-16369065 ] ASF GitHub Bot commented on FLINK-8560: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5481#discussion_r169060097 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java --- @@ -377,119 +375,114 @@ public T getKey(T value) throws Exception { } } - private static class QueryingFlatMapFunction extends ProcessFunction{ + private static class QueryingFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; public QueryingFlatMapFunction(TimeDomain timeDomain) { - this.timeDomain = timeDomain; + this.expectedTimeDomain = timeDomain; } @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { - if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) { out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } else { out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); } } @Override - public void onTimer( - long timestamp, - OnTimerContext ctx, - Collector out) throws Exception { + public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { + // Do nothing } } - private static class TriggeringFlatMapFunction extends ProcessFunction { + private static class TriggeringFlatMapFunction extends KeyedProcessFunction { private static final long serialVersionUID = 1L; - private final TimeDomain timeDomain; + private final TimeDomain expectedTimeDomain; + + static final Integer EXPECTED_KEY = 17; --- End diff -- As in the second PR: does this have to be a static field? Or can we initialise it in the constructor? I think it should work with the `expectedKey` set in the constructor as long as this is not an `ITCase` - and it's not, it's using test harness. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366743#comment-16366743 ] ASF GitHub Bot commented on FLINK-8560: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5481 I think you can rename the existing one to `LegacyKeyedProcessOperator` or something like this and have a comment that describes the situation. > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
[ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366201#comment-16366201 ] ASF GitHub Bot commented on FLINK-8560: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5481 @aljoscha @pnowojski Guys, quick question. I'm about to develop `KeyedProcessFunction` and its operator in a keyed stream. But I found there's already a `KeyedProcessOperator` which is for `ProcessFunction` in a keyed stream. Shall I create a new operator named something like `KeyedProcessFunctionOperator`? Thanks, Bowen > add KeyedProcessFunction to expose the key in onTimer() and other methods > - > > Key: FLINK-8560 > URL: https://issues.apache.org/jira/browse/FLINK-8560 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Jürgen Thomann >Assignee: Bowen Li >Priority: Major > Fix For: 1.5.0 > > > Currently it is required to store the key of a keyBy() in the processElement > method to have access to it in the OnTimerContext. > This is not so good as you have to check in the processElement method for > every element if the key is already stored and set it if it's not already set. > A possible solution would adding OnTimerContext#getCurrentKey() or a similar > method. Maybe having it in the open() method could maybe work as well. > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)