[jira] [Commented] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state api in both processElement and onTimer method
[ https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16954205#comment-16954205 ] vinoyang commented on FLINK-14428: -- I read the source code again, you are right. It's not a problem. > Non-consistency key access in KeyedProcessFunction when use keyed state api > in both processElement and onTimer method > - > > Key: FLINK-14428 > URL: https://issues.apache.org/jira/browse/FLINK-14428 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: vinoyang >Priority: Major > > Scenario: > In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} > and {{onTimer}} method may cause non-consistency key access. > Analysis: > For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key > which comes from timer when calling registerXXXTimeTimer: > {code:java} > public void onProcessingTime(long time) throws Exception { > // null out the timer in case the Triggerable calls > registerProcessingTimeTimer() > // inside the callback. > nextTimer = null; > InternalTimer timer; > while ((timer = processingTimeTimersQueue.peek()) != null && > timer.getTimestamp() <= time) { > processingTimeTimersQueue.poll(); > keyContext.setCurrentKey(timer.getKey());//here > triggerTarget.onProcessingTime(timer); > } > if (timer != null && nextTimer == null) { > nextTimer = > processingTimeService.registerTimer(timer.getTimestamp(), this); > } > } > {code} > For processElement method, in {{OneInputStreamTask}} it is called after > seting key context: > {code:java} > @Override > public void emitRecord(StreamRecord record) throws > Exception { > synchronized (lock) { > numRecordsIn.inc(); > operator.setKeyContextElement1(record); > //here > operator.processElement(record); > } > } > {code} > The setCurrentKey method in the first code snippet and the > setKeyContextElement1 method in the second code snippet are point to the same > {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the > different thread and there is only one keyed State Backend instance. And > {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed > state backend. > So if we access keyed state API in both {{processElement}} and {{onTimer}}, > we may get error state value, because one of these methods may change the key > and cause non-consistency problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state api in both processElement and onTimer method
[ https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953581#comment-16953581 ] vinoyang commented on FLINK-14428: -- [~aljoscha] It seems after 1.9+ we introduced {{MailboxProcessor}} it is not a problem. Before Flink 1.9(I am watching 1.8.2), {{processInput}} is invoked in task thread and {{onProcessingTime}} is invoked in time service thread, right? > Non-consistency key access in KeyedProcessFunction when use keyed state api > in both processElement and onTimer method > - > > Key: FLINK-14428 > URL: https://issues.apache.org/jira/browse/FLINK-14428 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: vinoyang >Priority: Major > > Scenario: > In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} > and {{onTimer}} method may cause non-consistency key access. > Analysis: > For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key > which comes from timer when calling registerXXXTimeTimer: > {code:java} > public void onProcessingTime(long time) throws Exception { > // null out the timer in case the Triggerable calls > registerProcessingTimeTimer() > // inside the callback. > nextTimer = null; > InternalTimer timer; > while ((timer = processingTimeTimersQueue.peek()) != null && > timer.getTimestamp() <= time) { > processingTimeTimersQueue.poll(); > keyContext.setCurrentKey(timer.getKey());//here > triggerTarget.onProcessingTime(timer); > } > if (timer != null && nextTimer == null) { > nextTimer = > processingTimeService.registerTimer(timer.getTimestamp(), this); > } > } > {code} > For processElement method, in {{OneInputStreamTask}} it is called after > seting key context: > {code:java} > @Override > public void emitRecord(StreamRecord record) throws > Exception { > synchronized (lock) { > numRecordsIn.inc(); > operator.setKeyContextElement1(record); > //here > operator.processElement(record); > } > } > {code} > The setCurrentKey method in the first code snippet and the > setKeyContextElement1 method in the second code snippet are point to the same > {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the > different thread and there is only one keyed State Backend instance. And > {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed > state backend. > So if we access keyed state API in both {{processElement}} and {{onTimer}}, > we may get error state value, because one of these methods may change the key > and cause non-consistency problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state api in both processElement and onTimer method
[ https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953549#comment-16953549 ] Aljoscha Krettek commented on FLINK-14428: -- {{onProcessingTime()}} is invoked under the checkpoint lock that {{emitRecord()}} also uses, so I don't think there is a problem. > Non-consistency key access in KeyedProcessFunction when use keyed state api > in both processElement and onTimer method > - > > Key: FLINK-14428 > URL: https://issues.apache.org/jira/browse/FLINK-14428 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: vinoyang >Priority: Major > > Scenario: > In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} > and {{onTimer}} method may cause non-consistency key access. > Analysis: > For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key > which comes from timer when calling registerXXXTimeTimer: > {code:java} > public void onProcessingTime(long time) throws Exception { > // null out the timer in case the Triggerable calls > registerProcessingTimeTimer() > // inside the callback. > nextTimer = null; > InternalTimer timer; > while ((timer = processingTimeTimersQueue.peek()) != null && > timer.getTimestamp() <= time) { > processingTimeTimersQueue.poll(); > keyContext.setCurrentKey(timer.getKey());//here > triggerTarget.onProcessingTime(timer); > } > if (timer != null && nextTimer == null) { > nextTimer = > processingTimeService.registerTimer(timer.getTimestamp(), this); > } > } > {code} > For processElement method, in {{OneInputStreamTask}} it is called after > seting key context: > {code:java} > @Override > public void emitRecord(StreamRecord record) throws > Exception { > synchronized (lock) { > numRecordsIn.inc(); > operator.setKeyContextElement1(record); > //here > operator.processElement(record); > } > } > {code} > The setCurrentKey method in the first code snippet and the > setKeyContextElement1 method in the second code snippet are point to the same > {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the > different thread and there is only one keyed State Backend instance. And > {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed > state backend. > So if we access keyed state API in both {{processElement}} and {{onTimer}}, > we may get error state value, because one of these methods may change the key > and cause non-consistency problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14428) Non-consistency key access in KeyedProcessFunction when use keyed state api in both processElement and onTimer method
[ https://issues.apache.org/jira/browse/FLINK-14428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953481#comment-16953481 ] vinoyang commented on FLINK-14428: -- [~aljoscha] and [~trohrmann] WDYT? > Non-consistency key access in KeyedProcessFunction when use keyed state api > in both processElement and onTimer method > - > > Key: FLINK-14428 > URL: https://issues.apache.org/jira/browse/FLINK-14428 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: vinoyang >Priority: Major > > Scenario: > In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} > and {{onTimer}} method may cause non-consistency key access. > Analysis: > For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key > which comes from timer when calling registerXXXTimeTimer: > {code:java} > public void onProcessingTime(long time) throws Exception { > // null out the timer in case the Triggerable calls > registerProcessingTimeTimer() > // inside the callback. > nextTimer = null; > InternalTimer timer; > while ((timer = processingTimeTimersQueue.peek()) != null && > timer.getTimestamp() <= time) { > processingTimeTimersQueue.poll(); > keyContext.setCurrentKey(timer.getKey());//here > triggerTarget.onProcessingTime(timer); > } > if (timer != null && nextTimer == null) { > nextTimer = > processingTimeService.registerTimer(timer.getTimestamp(), this); > } > } > {code} > For processElement method, in {{OneInputStreamTask}} it is called after > seting key context: > {code:java} > @Override > public void emitRecord(StreamRecord record) throws > Exception { > synchronized (lock) { > numRecordsIn.inc(); > operator.setKeyContextElement1(record); > //here > operator.processElement(record); > } > } > {code} > The setCurrentKey method in the first code snippet and the > setKeyContextElement1 method in the second code snippet are point to the same > {{AbstractStreamOperator#setCurrentKey}} method. However, they are in the > different thread and there is only one keyed State Backend instance. And > {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed > state backend. > So if we access keyed state API in both {{processElement}} and {{onTimer}}, > we may get error state value, because one of these methods may change the key > and cause non-consistency problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)