[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15656596#comment-15656596
 ] 

ASF GitHub Bot commented on FLINK-5012:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2772
  
Thanks for reviewing @jgrier!


> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15656592#comment-15656592
 ] 

ASF GitHub Bot commented on FLINK-5012:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2772


> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15654805#comment-15654805
 ] 

ASF GitHub Bot commented on FLINK-5012:
---

Github user jgrier commented on the issue:

https://github.com/apache/flink/pull/2772
  
LGTM


> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15647929#comment-15647929
 ] 

ASF GitHub Bot commented on FLINK-5012:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2772

[FLINK-5012] Expose Timestamp in Timely FlatMap Functions

This also adds a Context parameter that holds the timestamp, time domain
and TimerService to declutter the parameter list of the functions.

R: @StefanRRichter and @kl0u for review, please
CC: @jgrier

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink timely-timestamp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2772.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2772


commit 3b37adbd87744ff23d075f03d0259b87536265a9
Author: Aljoscha Krettek 
Date:   2016-11-08T15:52:21Z

[FLINK-5012] Expose Timestamp in Timely FlatMap Functions

This also adds a Context parameter that holds the timestamp, time domain
and TimerService to declutter the parameter list of the functions.




> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-07 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643499#comment-15643499
 ] 

Aljoscha Krettek commented on FLINK-5012:
-

The discussion was not about a way to emit an element with a timestamp. The 
question was whether to fold the {{collect()}} method into the {{Context}} or 
keep a separate {{Collector}} in the interface.

> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15637246#comment-15637246
 ] 

Stephan Ewen commented on FLINK-5012:
-

I just wanted the simple way of outputting an element to be there.

If the {{Collector}} would be an {{OutputCollector}} (or so) with two methods: 
{{collect()}} and {{collectWithTimestamp()}}, would that work?

> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636525#comment-15636525
 ] 

Aljoscha Krettek commented on FLINK-5012:
-

Yep, but it's different from normal {{FlatMap}} and now you have to do an extra 
hop, i.e. {{ctx.collector().collect(myElement);}}

[~StephanEwen] what do you think? you were against removing {{Collector}} in 
the updated {{ProcessWindowFunction}} in 
[FLIP-2|https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata].

> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636428#comment-15636428
 ] 

Jamie Grier commented on FLINK-5012:


Okay, makes sense about RuntimeContext..  I also like your "ideal" solution 
best -- or maybe:

{code:java}
void flatMap(I value, Context ctx) throws Exception;

interface Context {
  Long timestamp();
  TimerService timerService();
  Collector collector();
}
{code}


> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636333#comment-15636333
 ] 

Aljoscha Krettek commented on FLINK-5012:
-

I think one reason why not is that the {{RuntimeContext}} is also used for 
batch API stuff and it is already horribly overloaded. Another reason is that 
the {{RuntimeContext}} can only be retrieved in a {{RichFunction}}.

My ideal solution would be this:
{code}
void flatMap(I value, Context ctx) throws Exception;

interface Context {
  Long timestamp();
  TimerService timerService();
  void output(OUT value);
}
{code}

I've tried pushing something like this in the past though and generally the 
opinion is against this because it departs from the usual functions that have a 
{{Collector}}. So it'll probably be more like this in the end:

{code}
void flatMap(I value, Context ctx, Collector out) throws Exception;

interface Context {
  Long timestamp();
  TimerService timerService();
}
{code}

Or we could put the methods of {{TimerService}} directly into the context, then 
we would duplicate code between {{TimelyFlatMap}} and {{TimelyCoFlatMap}}, 
however.

> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5012) Provide Timestamp in TimelyFlatMapFunction

2016-11-04 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636266#comment-15636266
 ] 

Jamie Grier commented on FLINK-5012:


Definitely +1 to the Context parameter

I've always thought there should be a a way to get the timestamp of the current 
element in any Function.  Should we just add this to the RuntimeContext?  Is 
there a good reason to not do this?

> Provide Timestamp in TimelyFlatMapFunction
> --
>
> Key: FLINK-5012
> URL: https://issues.apache.org/jira/browse/FLINK-5012
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>
> Right now, {{TimelyFlatMapFunction}} does not give the timestamp of the 
> element in {{flatMap()}}.
> The signature is currently this:
> {code}
> void flatMap(I value, TimerService timerService, Collector out) throws 
> Exception;
> {code}
> if we add the timestamp it would become this:
> {code}
> void flatMap(I value, Long timestamp, TimerService timerService, Collector 
> out) throws Exception;
> {code}
> The reason why it's a {{Long}} and not a {{long}} is that an element might 
> not have a timestamp, in that case we should hand in {{null}} here.
> This is becoming quite look so we could add a {{Context}} parameter that 
> provides access to the timestamp and timer service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)