[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-11-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233224#comment-17233224
 ] 

Jark Wu commented on FLINK-18452:
-

I will take this issue. 

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Assignee: Jark Wu
>Priority: Major
> Fix For: 1.12.0
>
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object (here we call it 
> _StreamExecSortComparator$579_).

[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-07-13 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156587#comment-17156587
 ] 

Weike Dong commented on FLINK-18452:


Yes, I agree that we can fix this bug in next major release, i.e. 1.12.0, as 
currently there are no other user reports on this problem, it seems not to be 
that urgent to change existing class structure.

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Assignee: Weike Dong
>Priority: Major
> Fix For: 1.12.0
>
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes 

[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-30 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17149048#comment-17149048
 ] 

Jark Wu commented on FLINK-18452:
-

Hi [~kyledong], currently, SQL/Table doesn't provide state compatible across 
major versions. So I think we don't need to care about it. 

But I'm not sure whether it is worth to fix in minor versions. If we need to 
fix it in minor versions, we may need a complex solution. 

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Assignee: Weike Dong
>Priority: Major
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during 

[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-30 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148388#comment-17148388
 ] 

Weike Dong commented on FLINK-18452:


One important thing to note here is backward compatibility. For example, for 
_GeneratedRecordComparator_ instances created in current or earlier Flink 
versions, AFAIK they do not have the proper meta info needed to compare with 
the new one with the meta info.

In order to maintain compatibility, a hacky approach is to extract relevant 
fields from the generated code text, however, this is error-prone and could 
possibly only be used as a fallback approach if no other methods are available.

Or maybe we could add a new class (like _GeneratedRecordComparatorV2_) with the 
required meta info, and "migrate" the current implementation to the new one if 
needed. In this way, we could enumerate all of the comparator code generation 
implementations in existing Flink versions, and provide a robust migration plan.

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Priority: Major
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 

[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-30 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148348#comment-17148348
 ] 

Weike Dong commented on FLINK-18452:


Thank you [~jark] for the reply, and I agree that the meta info given to 
_org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen_ is 
more suitable for the comparison (maybe there are other classes that have the 
same problem).

I am quite interested in solving this issue, could you please assign this 
ticket to me? Thank you very much : )

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Priority: Major
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 

[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-30 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148332#comment-17148332
 ] 

Jark Wu commented on FLINK-18452:
-

I think this is an issue in {{ComparatorWrapper#equals}}, we shouldn't compare 
the generated class name and code, because they are not consistent even in the 
same Flink version. We should compare the sort key fields (the meata 
information used to generate the comparator). 

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Priority: Major
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes 

[jira] [Commented] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-06-29 Thread Weike Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148289#comment-17148289
 ] 

Weike Dong commented on FLINK-18452:


Hi [~jinyu.zj], would you please be so kind to look at this issus, as you are 
the original author for the code. Thank you very much.

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Priority: Major
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object