自定义 state 的 type-serializer,可以尝试下 SimpleVersionedSerializer/avro/protobuf, etc.
复杂状态应该尽量避免使用依赖反射的 serializer,java Serializable 同理,这些都不好做 state upgrade。 On January 31, 2021 at 11:29:25, 赵一旦 ([email protected]) wrote: 这个问题有人知道吗? 我知道原因了,应该是LongAdder这个类非public,但不知道除了覆盖这个类还有什么解决方案吗?从Flink层面有对这种情况的解决吗。 毕竟需要用到的类不一定是用户自定义的,就无法保证一定符合某些规则。比如这里的Guava中的LongAdder非public的问题,我目前覆盖了这个类是可以解决的。不清楚有没有其他解决方式。 赵一旦 <[email protected]> 于2021年1月28日周四 下午6:03写道: > 如下,我使用到了Guava的BloomFilter,貌似是基于kyro序列化的。检查点成功了,但是基于检查点恢复任务是失败的。 > 报错堆栈如下,关键错误是什么无法访问public修饰的成员? > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:235) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .initializeState(AbstractStreamOperator.java:248) > at org.apache.flink.streaming.runtime.tasks.OperatorChain > .initializeStateAndOpenOperators(OperatorChain.java:400) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$beforeInvoke$2(StreamTask.java:507) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .runThrowing(StreamTaskActionExecutor.java:47) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > StreamTask.java:501) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:531) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedProcessOperator_efc74365c4197a3ac1978016263fc7e7_(5 > /30) from any of the 1 provided restore options. > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:135) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.keyedStatedBackend( > StreamTaskStateInitializerImpl.java:316) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:155) > ... 9 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:116) > at org.apache.flink.runtime.state.filesystem.FsStateBackend > .createKeyedStateBackend(FsStateBackend.java:540) > at org.apache.flink.runtime.state.filesystem.FsStateBackend > .createKeyedStateBackend(FsStateBackend.java:100) > at org.apache.flink.runtime.state.StateBackend > .createKeyedStateBackend(StateBackend.java:178) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1( > StreamTaskStateInitializerImpl.java:299) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:121) > ... 11 more > Caused by: com.esotericsoftware.kryo.KryoException: Error constructing > instance of class: com.google.common.hash.LongAdder > Serialization trace: > bitCount (com.google.common.hash.BloomFilterStrategies$LockFreeBitArray) > bits (com.google.common.hash.BloomFilter) > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: > 136) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) > at com.esotericsoftware.kryo.serializers.FieldSerializer.create( > FieldSerializer.java:547) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:523) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:106) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField > .java:113) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read( > FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > MapSerializer.java:143) > at com.esotericsoftware.kryo.serializers.MapSerializer.read( > MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer > .deserialize(KryoSerializer.java:346) > at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders > .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) > at org.apache.flink.runtime.state. > KeyGroupPartitioner$PartitioningResultKeyGroupReader > .readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > .readKeyGroupStateData(HeapRestoreOperation.java:299) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > .readStateHandleStateData(HeapRestoreOperation.java:260) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore( > HeapRestoreOperation.java:160) > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:114) > ... 17 more > Caused by: java.lang.IllegalAccessException: Class com.twitter.chill. > Instantiators$$anonfun$normalJava$1 can not access a member of class > com.google.common.hash.LongAdder with modifiers "public" > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) > at java.lang.reflect.AccessibleObject.slowCheckMemberAccess( > AccessibleObject.java:296) > at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject > .java:288) > at java.lang.reflect.Constructor.newInstance(Constructor.java:413) > at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply( > KryoBase.scala:170) > at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: > 133) > ... 37 more > >
