[jira] [Commented] (FLINK-8560) Access to the current key in ProcessFunction after keyBy()

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363537#comment-16363537
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5481

[FLINK-8560] Access to the current key in ProcessFunction after keyBy()

## What is the purpose of the change

Currently, it is required to store the key of a keyBy() in the 
processElement method to have access to it in the OnTimerContext.

This is not so good as you have to check in the processElement method for 
every element if the key is already stored and set it if it's not already set.

A possible solution would adding OnTimerContext#getCurrentKey() or a 
similar method. Maybe having it in the open() method could maybe work as well.

## Brief change log

added `OnTimerContext#getCurrentKey()`

One limitation is that this impl of `getCurrentKey()` currently is not 
strongly typed. Declaring the key's type requires adding a new generic type to 
`ProcessFunction` - making the declaration from `ProcessFunction` to 
`ProcessFunction`. I'm worried it may break user's application, so 
I decide to make `getCurrentKey()` return an object. I'd like to discuss the 
feasibility of having strong type.

## Verifying this change

This change is already covered by existing tests, such as 
*KeyedProcessOperatorTest*.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8560

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5481


commit c5b8a4f27094b88c8641e2bdd30ea0ca65a7a4be
Author: Bowen Li 
Date:   2018-02-13T06:33:06Z

[FLINK-8560] Access to the current key in ProcessFunction after keyBy()




> Access to the current key in ProcessFunction after keyBy()
> --
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Wish
>  Components: DataStream API
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Minor
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) Access to the current key in ProcessFunction after keyBy()

2018-02-12 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360414#comment-16360414
 ] 

Piotr Nowojski commented on FLINK-8560:
---

[~phoenixjiangnan] why would we have to keep all of the keys in the memory? Can 
not we have in KeyedProcessOperator:
{code:java}
@Override
public void onEventTime(InternalTimer timer) throws Exception 
{
   collector.setAbsoluteTimestamp(timer.getTimestamp());
   onTime(timer, TimeDomain.EVENT_TIME);
}

@Override
public void onProcessingTime(InternalTimer timer) throws 
Exception {
   collector.eraseTimestamp();
   onTime(timer, TimeDomain.PROCESSING_TIME);
}

private void onTime(InternalTimer timer, TimeDomain 
timeDomain) throws Exception 
{ 
  Supplier currentKeyAccessor = () -> (K) getCurrentKey();
  onTimerContext.reinitialize(currentKeyAccessor, timeDomain, timer);
  userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
  onTimerContext.reset(); 
}{code}
 

Key would be only accessed when a user specifically asks for it, thus it 
shouldn't cost us any thing.

 

 

 

> Access to the current key in ProcessFunction after keyBy()
> --
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Wish
>  Components: DataStream API
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Minor
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) Access to the current key in ProcessFunction after keyBy()

2018-02-11 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360156#comment-16360156
 ] 

Bowen Li commented on FLINK-8560:
-

[~aljoscha] [~pnowojski]  I have taken a look at this, and am wondering if this 
is worth it. If we take the approach that Piotr suggested, it means we need to 
keep the key for every timer explicitly in memory, no matter users will use it 
or not - is this worth the effort and memory overhead?

> Access to the current key in ProcessFunction after keyBy()
> --
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Wish
>  Components: DataStream API
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Minor
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)