[jira] [Commented] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state api in both processElement and onTimer method

2019-10-17 Thread vinoyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16954205#comment-16954205
 ] 

vinoyang commented on FLINK-14428:
--

I read the source code again, you are right. It's not a problem.

> Non-consistency key access in KeyedProcessFunction when use keyed state api 
> in both processElement and onTimer method
> -
>
> Key: FLINK-14428
> URL: https://issues.apache.org/jira/browse/FLINK-14428
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: vinoyang
>Priority: Major
>
> Scenario:
> In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} 
> and {{onTimer}} method may cause non-consistency key access.
> Analysis:
> For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key 
> which comes from timer when calling registerXXXTimeTimer:
> {code:java}
> public void onProcessingTime(long time) throws Exception {
>   // null out the timer in case the Triggerable calls 
> registerProcessingTimeTimer()
>   // inside the callback.
>   nextTimer = null;
>   InternalTimer timer;
>   while ((timer = processingTimeTimersQueue.peek()) != null && 
> timer.getTimestamp() <= time) {
>   processingTimeTimersQueue.poll();
>   keyContext.setCurrentKey(timer.getKey());//here
>   triggerTarget.onProcessingTime(timer);
>   }
>   if (timer != null && nextTimer == null) {
>   nextTimer = 
> processingTimeService.registerTimer(timer.getTimestamp(), this);
>   }
>   }
> {code}
> For processElement method, in {{OneInputStreamTask}} it is called after 
> seting key context:
> {code:java}
>   @Override
>   public void emitRecord(StreamRecord record) throws 
> Exception {
>   synchronized (lock) {
>   numRecordsIn.inc();
>   operator.setKeyContextElement1(record);
> //here
>   operator.processElement(record);
>   }
>   }
> {code}
> The setCurrentKey method in the first code snippet and the 
> setKeyContextElement1 method in the second code snippet are point to the same 
> {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the 
> different thread and there is only one keyed State Backend instance. And 
> {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed 
> state backend.
> So if we access keyed state API in both {{processElement}} and {{onTimer}}, 
> we may get error state value, because one of these methods may change the key 
> and cause non-consistency problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state api in both processElement and onTimer method

2019-10-17 Thread vinoyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953581#comment-16953581
 ] 

vinoyang commented on FLINK-14428:
--

[~aljoscha] It seems after 1.9+ we introduced {{MailboxProcessor}} it is not a 
problem. Before Flink 1.9(I am watching 1.8.2), {{processInput}} is invoked in 
task thread and {{onProcessingTime}} is invoked in time service thread, right?

> Non-consistency key access in KeyedProcessFunction when use keyed state api 
> in both processElement and onTimer method
> -
>
> Key: FLINK-14428
> URL: https://issues.apache.org/jira/browse/FLINK-14428
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: vinoyang
>Priority: Major
>
> Scenario:
> In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} 
> and {{onTimer}} method may cause non-consistency key access.
> Analysis:
> For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key 
> which comes from timer when calling registerXXXTimeTimer:
> {code:java}
> public void onProcessingTime(long time) throws Exception {
>   // null out the timer in case the Triggerable calls 
> registerProcessingTimeTimer()
>   // inside the callback.
>   nextTimer = null;
>   InternalTimer timer;
>   while ((timer = processingTimeTimersQueue.peek()) != null && 
> timer.getTimestamp() <= time) {
>   processingTimeTimersQueue.poll();
>   keyContext.setCurrentKey(timer.getKey());//here
>   triggerTarget.onProcessingTime(timer);
>   }
>   if (timer != null && nextTimer == null) {
>   nextTimer = 
> processingTimeService.registerTimer(timer.getTimestamp(), this);
>   }
>   }
> {code}
> For processElement method, in {{OneInputStreamTask}} it is called after 
> seting key context:
> {code:java}
>   @Override
>   public void emitRecord(StreamRecord record) throws 
> Exception {
>   synchronized (lock) {
>   numRecordsIn.inc();
>   operator.setKeyContextElement1(record);
> //here
>   operator.processElement(record);
>   }
>   }
> {code}
> The setCurrentKey method in the first code snippet and the 
> setKeyContextElement1 method in the second code snippet are point to the same 
> {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the 
> different thread and there is only one keyed State Backend instance. And 
> {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed 
> state backend.
> So if we access keyed state API in both {{processElement}} and {{onTimer}}, 
> we may get error state value, because one of these methods may change the key 
> and cause non-consistency problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state api in both processElement and onTimer method

2019-10-17 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953549#comment-16953549
 ] 

Aljoscha Krettek commented on FLINK-14428:
--

{{onProcessingTime()}} is invoked under the checkpoint lock that 
{{emitRecord()}} also uses, so I don't think there is a problem.

> Non-consistency key access in KeyedProcessFunction when use keyed state api 
> in both processElement and onTimer method
> -
>
> Key: FLINK-14428
> URL: https://issues.apache.org/jira/browse/FLINK-14428
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: vinoyang
>Priority: Major
>
> Scenario:
> In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} 
> and {{onTimer}} method may cause non-consistency key access.
> Analysis:
> For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key 
> which comes from timer when calling registerXXXTimeTimer:
> {code:java}
> public void onProcessingTime(long time) throws Exception {
>   // null out the timer in case the Triggerable calls 
> registerProcessingTimeTimer()
>   // inside the callback.
>   nextTimer = null;
>   InternalTimer timer;
>   while ((timer = processingTimeTimersQueue.peek()) != null && 
> timer.getTimestamp() <= time) {
>   processingTimeTimersQueue.poll();
>   keyContext.setCurrentKey(timer.getKey());//here
>   triggerTarget.onProcessingTime(timer);
>   }
>   if (timer != null && nextTimer == null) {
>   nextTimer = 
> processingTimeService.registerTimer(timer.getTimestamp(), this);
>   }
>   }
> {code}
> For processElement method, in {{OneInputStreamTask}} it is called after 
> seting key context:
> {code:java}
>   @Override
>   public void emitRecord(StreamRecord record) throws 
> Exception {
>   synchronized (lock) {
>   numRecordsIn.inc();
>   operator.setKeyContextElement1(record);
> //here
>   operator.processElement(record);
>   }
>   }
> {code}
> The setCurrentKey method in the first code snippet and the 
> setKeyContextElement1 method in the second code snippet are point to the same 
> {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the 
> different thread and there is only one keyed State Backend instance. And 
> {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed 
> state backend.
> So if we access keyed state API in both {{processElement}} and {{onTimer}}, 
> we may get error state value, because one of these methods may change the key 
> and cause non-consistency problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state api in both processElement and onTimer method

2019-10-17 Thread vinoyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953481#comment-16953481
 ] 

vinoyang commented on FLINK-14428:
--

[~aljoscha] and [~trohrmann] WDYT?

> Non-consistency key access in KeyedProcessFunction when use keyed state api 
> in both processElement and onTimer method
> -
>
> Key: FLINK-14428
> URL: https://issues.apache.org/jira/browse/FLINK-14428
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: vinoyang
>Priority: Major
>
> Scenario:
> In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} 
> and {{onTimer}} method may cause non-consistency key access.
> Analysis:
> For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key 
> which comes from timer when calling registerXXXTimeTimer:
> {code:java}
> public void onProcessingTime(long time) throws Exception {
>   // null out the timer in case the Triggerable calls 
> registerProcessingTimeTimer()
>   // inside the callback.
>   nextTimer = null;
>   InternalTimer timer;
>   while ((timer = processingTimeTimersQueue.peek()) != null && 
> timer.getTimestamp() <= time) {
>   processingTimeTimersQueue.poll();
>   keyContext.setCurrentKey(timer.getKey());//here
>   triggerTarget.onProcessingTime(timer);
>   }
>   if (timer != null && nextTimer == null) {
>   nextTimer = 
> processingTimeService.registerTimer(timer.getTimestamp(), this);
>   }
>   }
> {code}
> For processElement method, in {{OneInputStreamTask}} it is called after 
> seting key context:
> {code:java}
>   @Override
>   public void emitRecord(StreamRecord record) throws 
> Exception {
>   synchronized (lock) {
>   numRecordsIn.inc();
>   operator.setKeyContextElement1(record);
> //here
>   operator.processElement(record);
>   }
>   }
> {code}
> The setCurrentKey method in the first code snippet and the 
> setKeyContextElement1 method in the second code snippet are point to the same 
> {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the 
> different thread and there is only one keyed State Backend instance. And 
> {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed 
> state backend.
> So if we access keyed state API in both {{processElement}} and {{onTimer}}, 
> we may get error state value, because one of these methods may change the key 
> and cause non-consistency problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)