Hi 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema evolution[1]
[1] https://issues.apache.org/jira/browse/FLINK-11947 Best, Congxian claylin <[email protected]> 于2019年11月14日周四 下午9:35写道: > 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案 > java.lang.RuntimeException: Error while getting state at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) > at > com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.flink.util.StateMigrationException: The new serializer for a > MapState requires state migration in order for the job to proceed. However, > migration for MapState currently isn't supported. at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > ... 9 more
