Weike Dong created FLINK-18452:
----------------------------------

             Summary: Flaws in RetractableTopNFunction.ComparatorWrapper#equals 
method prevent state access after restoration
                 Key: FLINK-18452
                 URL: https://issues.apache.org/jira/browse/FLINK-18452
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.10.1, 1.10.0, 1.11.0
            Reporter: Weike Dong
         Attachments: 
c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png

We found that in SQL jobs using "Top-N" functionality provided by the blink 
planner, the job state cannot be retrieved because of "incompatible" state 
serializers (in fact they are compatible).

The error log is displayed like below
{panel:title=taskmanager.log}
2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - 
Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, 
rankEnd=100], partitionBy=[appkey, serverid], orderBy=[quantity DESC], 
select=[appkey, serverid, oid, quantity]) (1/1) 
(bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
java.lang.RuntimeException: Error while getting state
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
        at 
org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer cannot be incompatible.
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
        at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
        ... 13 more{panel}
 
After careful debugging, it is found to be an issue with the compatibility 
check of type serializers.
 
In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
creating a _SortedMapSerializerSnapshot_ object, and the original comparator is 
encapsulated within the object (here we call it _StreamExecSortComparator$579_).
 
At restoration, the object is read and restored as normal. However, during the 
construction of RetractableTopNFunction instance, another Comparator is 
provided by Flink as an argument (we call it _StreamExecSortComparator$626_), 
and it is later used in the _ValueStateDescriptor_ which acts like a key to the 
state store.
 
Here comes the problem: when the newly-restored Flink program tries to access 
state (_getState_) through the previously mentioned _ValueStateDescriptor_, the 
State Backend firstly detects whether the provided comparator in state 
descriptor is compatible with the one in snapshot, eventually the logic goes to 
the _equals_ method at _RetractableTopNFunction.ComparatorWrapper_ class.
 
In the equals method, here is a code snippet:
{code:java}
return 
generatedRecordComparator.getClassName().equals(oGeneratedComparator.getClassName())
 &&
      
generatedRecordComparator.getCode().equals(oGeneratedComparator.getCode()) &&
      Arrays.equals(generatedRecordComparator.getReferences(), 
oGeneratedComparator.getReferences());
{code}
After debugging, we found that the class name of comparator within snapshot is 
_StreamExecSortComparator$579_, and the class name of comparator provided in 
the new job is _StreamExecSortComparator$626_, hence this method always returns 
false, even though actually they are indeed compatible (acts the same). Also, 
because the code in each generator is generated independently, the 
corresponding varaibles within the two comparators are highly likely to be 
different (_isNullA$581_ vs _isNullA$682_).
 
Hence we believe that the implementation of equals method has serious flaws, 
and should be addressed in later releases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to