[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388031#comment-16388031
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5481


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387512#comment-16387512
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5481
  
Thanks @bowenli86 ! I will merge as soon as Travis gives green.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16387304#comment-16387304
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
@kl0u @aljoscha   I added the scala example, and I believe the only build 
failure in Travis is irrelevant


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385880#comment-16385880
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r172136142
  
--- Diff: docs/dev/stream/operators/process_function.md ---
@@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends 
ProcessFunction[(String, String), (String
 the current processing time as event-time timestamp. This behavior is very 
subtle and might not be noticed by users. Well, it's
 harmful because processing-time timestamps are indeterministic and not 
aligned with watermarks. Besides, user-implemented logic
 depends on this wrong timestamp highly likely is unintendedly faulty. So 
we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
-that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
\ No newline at end of file
+that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
+
+## The KeyedProcessFunction
+
+`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access 
to the key of timers in its `onTimer(...)`
+method.
+
+{% highlight java %}
--- End diff --

@bowenli86 As soon as the scala example is added, I can take care of the 
other two comments and merge! Let me know when you update the PR, and thanks 
for the work!


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385877#comment-16385877
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r172133573
  
--- Diff: docs/dev/stream/operators/process_function.md ---
@@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends 
ProcessFunction[(String, String), (String
 the current processing time as event-time timestamp. This behavior is very 
subtle and might not be noticed by users. Well, it's
 harmful because processing-time timestamps are indeterministic and not 
aligned with watermarks. Besides, user-implemented logic
 depends on this wrong timestamp highly likely is unintendedly faulty. So 
we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
-that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
\ No newline at end of file
+that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
+
+## The KeyedProcessFunction
+
+`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access 
to the key of timers in its `onTimer(...)`
+method.
+
+{% highlight java %}
+@Override
+public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {
--- End diff --

I believe this is now `public void onTimer(long timestamp, 
OnTimerContext ctx, Collector out)`, right? @kl0u you could fix this 
while merging.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385876#comment-16385876
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r172135390
  
--- Diff: docs/dev/stream/operators/process_function.md ---
@@ -242,4 +242,17 @@ class CountWithTimeoutFunction extends 
ProcessFunction[(String, String), (String
 the current processing time as event-time timestamp. This behavior is very 
subtle and might not be noticed by users. Well, it's
 harmful because processing-time timestamps are indeterministic and not 
aligned with watermarks. Besides, user-implemented logic
 depends on this wrong timestamp highly likely is unintendedly faulty. So 
we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
-that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
\ No newline at end of file
+that are using this incorrect event-time timestamp will fail, and users 
should adapt their jobs to the correct logic.
+
+## The KeyedProcessFunction
+
+`KeyedProcessFunction`, as an extension of `ProcessFunction`, gives access 
to the key of timers in its `onTimer(...)`
+method.
+
+{% highlight java %}
--- End diff --

Maybe also add Scala example code.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16385875#comment-16385875
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r172135104
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
 ---
@@ -70,21 +69,15 @@ public void open() throws Exception {
@Override
public void onEventTime(InternalTimer timer) throws 
Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
-   onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-   onTimerContext.timer = timer;
-   userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
-   onTimerContext.timeDomain = null;
-   onTimerContext.timer = null;
+   reinitialize(userFunction, TimeDomain.EVENT_TIME, timer);
--- End diff --

Hate to be picky, but I think the name is a bit misleading and we could 
probably put all of this in a method `invokeUserTime()` that does what 
`reinitialise()` and `reset()` do.

@kl0u I think you can quickly fix that when merging.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383180#comment-16383180
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
@kl0u I added the comments for `@deprecated` in the javadoc. Let me know if 
you can merge the two related PRs. Thanks


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16382032#comment-16382032
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171566773
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -66,9 +66,9 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 * function, this function can also query the time and set timers. When 
reacting to the firing
 * of set timers the function can directly emit elements and/or 
register yet more timers.
 *
-* @param processFunction The [[ProcessFunction]] that is called for 
each element
-*   in the stream.
+* @param processFunction The [[ProcessFunction]] that is called for 
each element in the stream.
--- End diff --

Please also add that the user now should use the new `KeyedProcessFunction` 
instead.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380275#comment-16380275
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171233273
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -84,18 +86,18 @@
  * elements that have the same key.
  *
  * @param  The type of the elements in the Keyed Stream.
- * @param  The type of the key in the Keyed Stream.
+ * @param  The type of the key in the Keyed Stream.
--- End diff --

Revert the renaming from `KEY` to `K`.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380276#comment-16380276
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171237136
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 
 asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
-  
+
+  /**
+   * Applies the given [[KeyedProcessFunction]] on the input stream, 
thereby
+   * creating a transformed output stream.
+   *
+   * The function will be called for every element in the stream and can 
produce
+   * zero or more output. The function can also query the time and set 
timers. When
+   * reacting to the firing of set timers the function can emit yet more 
elements.
+   *
+   * The function will be called for every element in the input streams 
and can produce zero
+   * or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
+   * function, this function can also query the time and set timers. When 
reacting to the firing
+   * of set timers the function can directly emit elements and/or register 
yet more timers.
+   *
+   * @param keyedProcessFunction The [[KeyedProcessFunction]] that is 
called for each element
+   * in the stream.
+   */
+  @PublicEvolving
+  def process[K, R: TypeInformation](
+keyedProcessFunction: KeyedProcessFunction[K, T, R]): DataStream[R] = {
+
+if (keyedProcessFunction == null) {
+  throw new NullPointerException("ProcessFunction must not be null.")
+}
--- End diff --

The message now should be `"KeyedProcessFunction must not be null."`


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380279#comment-16380279
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171234582
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -321,19 +324,80 @@ private boolean 
validateKeyTypeIsHashable(TypeInformation type) {
 * @param  The type of elements emitted by the {@code 
ProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
+*
+* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, 
TypeInformation)}
 */
+   @Deprecated
@Override
@Internal
public  SingleOutputStreamOperator process(
ProcessFunction processFunction,
TypeInformation outputType) {
 
-   KeyedProcessOperator operator =
-   new 
KeyedProcessOperator<>(clean(processFunction));
+   LegacyKeyedProcessOperator operator = new 
LegacyKeyedProcessOperator<>(clean(processFunction));
 
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+* function, this function can also query the time and set timers. When 
reacting to the firing
+* of set timers the function can directly emit elements and/or 
register yet more timers.
+*
+* @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {
+
+   TypeInformation outType = 
TypeExtractor.getUnaryOperatorReturnType(
+   keyedProcessFunction,
+   KeyedProcessFunction.class,
+   0,
+   1,
+   TypeExtractor.NO_INDEX,
+   TypeExtractor.NO_INDEX,
+   getType(),
+   Utils.getCallLocationName(),
+   true);
+
+   return process(keyedProcessFunction, outType);
+   }
+
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+* function, this function can also query the time and set timers. When 
reacting to the firing
+* of set timers the function can directly emit elements and/or 
register yet more timers.
+*
+* @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+*
+* @param outputType {@link TypeInformation} for the result type of the 
function.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @Internal
+   public  SingleOutputStreamOperator process(
+   KeyedProcessFunction keyedProcessFunction,
--- End diff --

Same here, you do not have to redefine the type of the key.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> 

[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380273#comment-16380273
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171234411
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -321,19 +324,80 @@ private boolean 
validateKeyTypeIsHashable(TypeInformation type) {
 * @param  The type of elements emitted by the {@code 
ProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
+*
+* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, 
TypeInformation)}
 */
+   @Deprecated
@Override
@Internal
public  SingleOutputStreamOperator process(
ProcessFunction processFunction,
TypeInformation outputType) {
 
-   KeyedProcessOperator operator =
-   new 
KeyedProcessOperator<>(clean(processFunction));
+   LegacyKeyedProcessOperator operator = new 
LegacyKeyedProcessOperator<>(clean(processFunction));
 
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+* function, this function can also query the time and set timers. When 
reacting to the firing
+* of set timers the function can directly emit elements and/or 
register yet more timers.
+*
+* @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {
+
+   TypeInformation outType = 
TypeExtractor.getUnaryOperatorReturnType(
+   keyedProcessFunction,
+   KeyedProcessFunction.class,
+   0,
--- End diff --

The indices here are not `0` and `1` for input and output type, but `1` and 
`2`. In the process function it was 0 and 1 because we did not have the key.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380278#comment-16380278
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171237037
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 
 asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
-  
+
+  /**
+   * Applies the given [[KeyedProcessFunction]] on the input stream, 
thereby
+   * creating a transformed output stream.
+   *
+   * The function will be called for every element in the stream and can 
produce
+   * zero or more output. The function can also query the time and set 
timers. When
+   * reacting to the firing of set timers the function can emit yet more 
elements.
+   *
+   * The function will be called for every element in the input streams 
and can produce zero
+   * or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
+   * function, this function can also query the time and set timers. When 
reacting to the firing
+   * of set timers the function can directly emit elements and/or register 
yet more timers.
+   *
+   * @param keyedProcessFunction The [[KeyedProcessFunction]] that is 
called for each element
+   * in the stream.
+   */
+  @PublicEvolving
+  def process[K, R: TypeInformation](
--- End diff --

As in java, you do not need to redefine the `K` here. So you can remove it 
`def process[R: TypeInformation](`...


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380280#comment-16380280
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171235653
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
 ---
@@ -132,15 +139,15 @@ public TimerService timerService() {
}
}
 
-   private class OnTimerContextImpl extends ProcessFunction.OnTimerContext{
+   private class OnTimerContextImpl extends KeyedProcessFunction.OnTimerContext {
 
--- End diff --

With the proposed changes you can also remove the `` in the 
`.OnTimerContext`.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380274#comment-16380274
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171235356
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A keyed function that processes elements of a stream.
+ *
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
+ *
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code KeyedProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code KeyedProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} is always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param  Type of the key.
+ * @param  Type of the input elements.
+ * @param  Type of the output elements.
+ */
+@PublicEvolving
+public abstract class KeyedProcessFunction extends 
AbstractRichFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* Process one element from the input stream.
+*
+* This function can output zero or more elements using the {@link 
Collector} parameter
+* and also update internal state or set timers using the {@link 
Context} parameter.
+*
+* @param value The input value.
+* @param ctx A {@link Context} that allows querying the timestamp of 
the element and getting
+*a {@link TimerService} for registering timers and 
querying the time. The
+*context is only valid during the invocation of this 
method, do not store it.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   public abstract void processElement(I value, Context ctx, Collector 
out) throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param ctx An {@link OnTimerContext} that allows querying the 
timestamp, the {@link TimeDomain}, and the key
+*of the firing timer and getting a {@link TimerService} 
for registering timers and querying the time.
+*The context is only valid during the invocation of this 
method, do not store it.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. 

[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380272#comment-16380272
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171233924
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -321,19 +324,80 @@ private boolean 
validateKeyTypeIsHashable(TypeInformation type) {
 * @param  The type of elements emitted by the {@code 
ProcessFunction}.
 *
 * @return The transformed {@link DataStream}.
+*
+* @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, 
TypeInformation)}
 */
+   @Deprecated
@Override
@Internal
public  SingleOutputStreamOperator process(
ProcessFunction processFunction,
TypeInformation outputType) {
 
-   KeyedProcessOperator operator =
-   new 
KeyedProcessOperator<>(clean(processFunction));
+   LegacyKeyedProcessOperator operator = new 
LegacyKeyedProcessOperator<>(clean(processFunction));
 
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements. Contrary to the {@link 
DataStream#flatMap(FlatMapFunction)}
+* function, this function can also query the time and set timers. When 
reacting to the firing
+* of set timers the function can directly emit elements and/or 
register yet more timers.
+*
+* @param keyedProcessFunction The {@link KeyedProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {
+
--- End diff --

The signature should be: 
```public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {```

You do not have to re-define the type of the key (the `` in the 
beginning) as we are already in a `KeyedStream` with an already defined type of 
key.

Also remove the corresponding part in the `javadoc`.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380281#comment-16380281
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171235118
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A keyed function that processes elements of a stream.
+ *
+ * For every element in the input stream {@link #processElement(Object, 
Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. 
Implementations can also
+ * query the time and set timers through the provided {@link Context}. For 
firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This 
can again produce
+ * zero or more elements as output and register further timers.
+ *
+ * NOTE: Access to keyed state and timers (which are also scoped 
to a key) is only
+ * available if the {@code KeyedProcessFunction} is applied on a {@code 
KeyedStream}.
+ *
+ * NOTE: A {@code KeyedProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, 
access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} is always 
available and setup and
+ * teardown methods can be implemented. See
+ * {@link 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param  Type of the key.
+ * @param  Type of the input elements.
+ * @param  Type of the output elements.
+ */
+@PublicEvolving
+public abstract class KeyedProcessFunction extends 
AbstractRichFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* Process one element from the input stream.
+*
+* This function can output zero or more elements using the {@link 
Collector} parameter
+* and also update internal state or set timers using the {@link 
Context} parameter.
+*
+* @param value The input value.
+* @param ctx A {@link Context} that allows querying the timestamp of 
the element and getting
+*a {@link TimerService} for registering timers and 
querying the time. The
+*context is only valid during the invocation of this 
method, do not store it.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+*   to fail and may trigger recovery.
+*/
+   public abstract void processElement(I value, Context ctx, Collector 
out) throws Exception;
+
+   /**
+* Called when a timer set using {@link TimerService} fires.
+*
+* @param timestamp The timestamp of the firing timer.
+* @param ctx An {@link OnTimerContext} that allows querying the 
timestamp, the {@link TimeDomain}, and the key
+*of the firing timer and getting a {@link TimerService} 
for registering timers and querying the time.
+*The context is only valid during the invocation of this 
method, do not store it.
+* @param out The collector for returning result values.
+*
+* @throws Exception This method may throw exceptions. 

[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380277#comment-16380277
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r171236298
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -54,21 +54,20 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
   // 

 
   /**
-* Applies the given [[ProcessFunction]] on the input stream, thereby
-* creating a transformed output stream.
-*
-* The function will be called for every element in the stream and can 
produce
-* zero or more output. The function can also query the time and set 
timers. When
-* reacting to the firing of set timers the function can emit yet more 
elements.
-*
-* The function will be called for every element in the input streams 
and can produce zero
-* or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
-* function, this function can also query the time and set timers. When 
reacting to the firing
-* of set timers the function can directly emit elements and/or 
register yet more timers.
-*
-* @param processFunction The [[ProcessFunction]] that is called for 
each element
-*   in the stream.
-*/
+   * Applies the given [[ProcessFunction]] on the input stream, thereby
+   * creating a transformed output stream.
+   *
+   * The function will be called for every element in the stream and can 
produce
+   * zero or more output. The function can also query the time and set 
timers. When
+   * reacting to the firing of set timers the function can emit yet more 
elements.
+   *
+   * The function will be called for every element in the input streams 
and can produce zero
+   * or more output elements. Contrary to the 
[[DataStream#flatMap(FlatMapFunction)]]
+   * function, this function can also query the time and set timers. When 
reacting to the firing
+   * of set timers the function can directly emit elements and/or register 
yet more timers.
+   *
+   * @param processFunction The [[ProcessFunction]] that is called for 
each element in the stream.
+   */
--- End diff --

Revert all reformatings (indent by 1 space) and also add the `@deprecated` 
annotation with the correct, non-deprecated alternative, as done in the 
corresponding `java` class.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372176#comment-16372176
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
Thanks for the review and suggestions. And your comment on 
`DataStream#process(KeyedProcessFunction)` makes sense, I've removed it. 

(btw, I feel https://github.com/apache/flink/pull/5500 is more urgent that 
this PR. Can you take it look at that one?)


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371135#comment-16371135
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571436
  
--- Diff: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 ---
@@ -448,11 +447,35 @@ class DataStreamTest extends AbstractTestBase {
 val flatMapped = src.keyBy(x => x).process(processFunction)
 
 assert(processFunction == getFunctionForDataStream(flatMapped))
+
assert(getOperatorForDataStream(flatMapped).isInstanceOf[LegacyKeyedProcessOperator[_,
 _, _]])
+  }
+
+  /**
+* Verify that a [[KeyedStream.process(KeyedProcessFunction)]] call is 
correctly
--- End diff --

ditto


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371136#comment-16371136
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169573342
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ---
@@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() {
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param keyedProcessFunction The {@link ProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
KeyedProcessFunction}.
+*
+* @return The transformed {@link DataStream}.
+*/
+   @PublicEvolving
+   public  SingleOutputStreamOperator 
process(KeyedProcessFunction keyedProcessFunction) {
--- End diff --

Does it make sense to add `process` with a `KeyedProcessFunction` on non 
keyed `DataStream`?


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371132#comment-16371132
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571197
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -686,6 +686,27 @@ class DataStream[T](stream: JavaStream[T]) {
 asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
 
+  /**
+* Applies the given [[KeyedProcessFunction]] on the input stream, 
thereby
--- End diff --

ditto


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371137#comment-16371137
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571398
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 ---
@@ -79,7 +78,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 
 asScalaStream(javaStream.process(processFunction, 
implicitly[TypeInformation[R]]))
   }
-  
+
+  /**
+* Applies the given [[KeyedProcessFunction]] on the input stream, 
thereby
--- End diff --

ditto


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371134#comment-16371134
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571457
  
--- Diff: 
flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 ---
@@ -473,6 +496,28 @@ class DataStreamTest extends AbstractTestBase {
 
assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _]])
   }
 
+  /**
+* Verify that a [[DataStream.process(KeyedProcessFunction)]] call is 
correctly
--- End diff --

ditto


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371133#comment-16371133
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169571208
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -666,18 +666,18 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
-   * Applies the given [[ProcessFunction]] on the input stream, thereby
-   * creating a transformed output stream.
-   *
-   * The function will be called for every element in the stream and can 
produce
-   * zero or more output.
-   *
-   * @param processFunction The [[ProcessFunction]] that is called for 
each element
-   *   in the stream.
-   */
+* Applies the given [[ProcessFunction]] on the input stream, thereby
--- End diff --

Can you revert this formatting? I think proper java docs should be as this 
was before:
```
/**
 *
 */
```
instead of:
```
/**
  *
  */
```


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370944#comment-16370944
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
cc @pnowojski  @aljoscha 


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369124#comment-16369124
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user juergenthomann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169077241
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 ---
@@ -654,6 +648,63 @@ public ExecutionConfig getExecutionConfig() {
return transform("Process", outputType, operator);
}
 
+   /**
+* Applies the given {@link KeyedProcessFunction} on the input stream, 
thereby creating a transformed output stream.
+*
+* The function will be called for every element in the input 
streams and can produce zero
+* or more output elements.
+*
+* @param keyedProcessFunction The {@link ProcessFunction} that is 
called for each element in the stream.
+*
+* @param  The type of key in {@code KeyedProcessFunction}.
+*
+* @param  The type of elements emitted by the {@code 
PKeyedProcessFunction}.
--- End diff --

Could this be a typo with **P**KeyedProcessFunction instead of 
KeyedProcessFunction?


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369066#comment-16369066
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169060125
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -377,119 +375,114 @@ public T getKey(T value) throws Exception {
}
}
 
-   private static class QueryingFlatMapFunction extends 
ProcessFunction {
+   private static class QueryingFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
 
public QueryingFlatMapFunction(TimeDomain timeDomain) {
-   this.timeDomain = timeDomain;
+   this.expectedTimeDomain = timeDomain;
}
 
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
-   if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+   if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
out.collect(value + "TIME:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
} else {
out.collect(value + "TIME:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
}
}
 
@Override
-   public void onTimer(
-   long timestamp,
-   OnTimerContext ctx,
-   Collector out) throws Exception {
+   public void onTimer(long timestamp, OnTimerContext 
ctx, Collector out) throws Exception {
+   // Do nothing
}
}
 
-   private static class TriggeringFlatMapFunction extends 
ProcessFunction {
+   private static class TriggeringFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
+
+   static final Integer EXPECTED_KEY = 17;
 
public TriggeringFlatMapFunction(TimeDomain timeDomain) {
-   this.timeDomain = timeDomain;
+   this.expectedTimeDomain = timeDomain;
}
 
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
out.collect(value);
-   if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+   if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {

ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() 
+ 5);
} else {

ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + 5);
}
}
 
@Override
-   public void onTimer(
-   long timestamp,
-   OnTimerContext ctx,
-   Collector out) throws Exception {
-
-   assertEquals(this.timeDomain, ctx.timeDomain());
+   public void onTimer(long timestamp, OnTimerContext 
ctx, Collector out) throws Exception {
+   assertEquals(EXPECTED_KEY, ctx.getCurrentKey());
+   assertEquals(expectedTimeDomain, ctx.timeDomain());
out.collect(1777);
}
}
 
-   private static class TriggeringStatefulFlatMapFunction extends 
ProcessFunction {
+   private static class TriggeringStatefulFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
private final ValueStateDescriptor state =
new ValueStateDescriptor<>("seen-element", 
IntSerializer.INSTANCE);
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
 
public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) 
{
-   this.timeDomain = timeDomain;
+  

[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369065#comment-16369065
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r169060097
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 ---
@@ -377,119 +375,114 @@ public T getKey(T value) throws Exception {
}
}
 
-   private static class QueryingFlatMapFunction extends 
ProcessFunction {
+   private static class QueryingFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
 
public QueryingFlatMapFunction(TimeDomain timeDomain) {
-   this.timeDomain = timeDomain;
+   this.expectedTimeDomain = timeDomain;
}
 
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
-   if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+   if (expectedTimeDomain.equals(TimeDomain.EVENT_TIME)) {
out.collect(value + "TIME:" + 
ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
} else {
out.collect(value + "TIME:" + 
ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
}
}
 
@Override
-   public void onTimer(
-   long timestamp,
-   OnTimerContext ctx,
-   Collector out) throws Exception {
+   public void onTimer(long timestamp, OnTimerContext 
ctx, Collector out) throws Exception {
+   // Do nothing
}
}
 
-   private static class TriggeringFlatMapFunction extends 
ProcessFunction {
+   private static class TriggeringFlatMapFunction extends 
KeyedProcessFunction {
 
private static final long serialVersionUID = 1L;
 
-   private final TimeDomain timeDomain;
+   private final TimeDomain expectedTimeDomain;
+
+   static final Integer EXPECTED_KEY = 17;
--- End diff --

As in the second PR: does this have to be a static field? Or can we 
initialise it in the constructor? I think it should work with the `expectedKey` 
set in the constructor as long as this is not an `ITCase` - and it's not, it's 
using test harness. 


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366743#comment-16366743
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5481
  
I think you can rename the existing one to `LegacyKeyedProcessOperator` or 
something like this and have a comment that describes the situation.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366201#comment-16366201
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
@aljoscha @pnowojski  Guys, quick question. I'm about to develop 
`KeyedProcessFunction` and its operator in a keyed stream. But I found there's 
already a `KeyedProcessOperator` which is for `ProcessFunction` in a keyed 
stream. Shall I create a new operator named something like 
`KeyedProcessFunctionOperator`?

Thanks,
Bowen


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)