[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15656596#comment-15656596 ] ASF GitHub Bot commented on FLINK-5012: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2772 Thanks for reviewing @jgrier! > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15656592#comment-15656592 ] ASF GitHub Bot commented on FLINK-5012: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2772 > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15654805#comment-15654805 ] ASF GitHub Bot commented on FLINK-5012: --- Github user jgrier commented on the issue: https://github.com/apache/flink/pull/2772 LGTM > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15647929#comment-15647929 ] ASF GitHub Bot commented on FLINK-5012: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2772 [FLINK-5012] Expose Timestamp in Timely FlatMap Functions This also adds a Context parameter that holds the timestamp, time domain and TimerService to declutter the parameter list of the functions. R: @StefanRRichter and @kl0u for review, please CC: @jgrier You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink timely-timestamp Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2772.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2772 commit 3b37adbd87744ff23d075f03d0259b87536265a9 Author: Aljoscha Krettek Date: 2016-11-08T15:52:21Z [FLINK-5012] Expose Timestamp in Timely FlatMap Functions This also adds a Context parameter that holds the timestamp, time domain and TimerService to declutter the parameter list of the functions. > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643499#comment-15643499 ] Aljoscha Krettek commented on FLINK-5012: - The discussion was not about a way to emit an element with a timestamp. The question was whether to fold the {{collect()}} method into the {{Context}} or keep a separate {{Collector}} in the interface. > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15637246#comment-15637246 ] Stephan Ewen commented on FLINK-5012: - I just wanted the simple way of outputting an element to be there. If the {{Collector}} would be an {{OutputCollector}} (or so) with two methods: {{collect()}} and {{collectWithTimestamp()}}, would that work? > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636525#comment-15636525 ] Aljoscha Krettek commented on FLINK-5012: - Yep, but it's different from normal {{FlatMap}} and now you have to do an extra hop, i.e. {{ctx.collector().collect(myElement);}} [~StephanEwen] what do you think? you were against removing {{Collector}} in the updated {{ProcessWindowFunction}} in [FLIP-2|https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata]. > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636428#comment-15636428 ] Jamie Grier commented on FLINK-5012: Okay, makes sense about RuntimeContext.. I also like your "ideal" solution best -- or maybe: {code:java} void flatMap(I value, Context ctx) throws Exception; interface Context { Long timestamp(); TimerService timerService(); Collector collector(); } {code} > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636333#comment-15636333 ] Aljoscha Krettek commented on FLINK-5012: - I think one reason why not is that the {{RuntimeContext}} is also used for batch API stuff and it is already horribly overloaded. Another reason is that the {{RuntimeContext}} can only be retrieved in a {{RichFunction}}. My ideal solution would be this: {code} void flatMap(I value, Context ctx) throws Exception; interface Context { Long timestamp(); TimerService timerService(); void output(OUT value); } {code} I've tried pushing something like this in the past though and generally the opinion is against this because it departs from the usual functions that have a {{Collector}}. So it'll probably be more like this in the end: {code} void flatMap(I value, Context ctx, Collector out) throws Exception; interface Context { Long timestamp(); TimerService timerService(); } {code} Or we could put the methods of {{TimerService}} directly into the context, then we would duplicate code between {{TimelyFlatMap}} and {{TimelyCoFlatMap}}, however. > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction
[ https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636266#comment-15636266 ] Jamie Grier commented on FLINK-5012: Definitely +1 to the Context parameter I've always thought there should be a a way to get the timestamp of the current element in any Function. Should we just add this to the RuntimeContext? Is there a good reason to not do this? > Provide Timestamp in TimelyFlatMapFunction > -- > > Key: FLINK-5012 > URL: https://issues.apache.org/jira/browse/FLINK-5012 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek > > Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the > element in {{flatMap()}}. > The signature is currently this: > {code} > void flatMap(I value, TimerService timerService, Collector out) throws > Exception; > {code} > if we add the timestamp it would become this: > {code} > void flatMap(I value, Long timestamp, TimerService timerService, Collector > out) throws Exception; > {code} > The reason why it's a {{Long}} and not a {{long}} is that an element might > not have a timestamp, in that case we should hand in {{null}} here. > This is becoming quite look so we could add a {{Context}} parameter that > provides access to the timestamp and timer service. -- This message was sent by Atlassian JIRA (v6.3.4#6332)