Hi, 各位好:
   我在使用state processor api创建新的包含kafka相关state的savepoint用来修改max 
parallelism时,创建成功后使用此savepoint来重启任务,发现抛出如下异常:
  {code}
  java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_e8ea6e352a1a627513ffbd4573fa1628_(1/1) from any 
of the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:265)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:152)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
    at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
    at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:552)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:256)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 
'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
    at 
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
    at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
    at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
    at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
    at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
    at 
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
    at 
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
    at 
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113)
    at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:94)
    at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 15 more
  {code}
我的使用方式如下:
@Override
 public void createNewSavepoint(ExecutionEnvironment env, String savepointPath, 
StateBackend stateBackend,
 ParameterTool config) {
    // 加载未修改max parallelism的savepoint
     String savepointOutputPath = 
config.get(EapSavepointConstants.EAP_SAVEPOINT_OUTPUT_PATH);
   // 新的max parallelism
     int maxParallelism = 
config.getInt(EapSavepointConstants.EAP_SAVEPOINT_MAX_PARALLELISM);
    
     try {
         ExistingSavepoint existingSavepoint = Savepoint.load(env, 
savepointPath, stateBackend);
         
        // 读取kafka相关state
         DataSet<Tuple2<KafkaTopicPartition, Long>> kafkaListState = 
existingSavepoint.readUnionState(
         OperatorUidAndNameConstants.KAFKA_SOURCE_UID, 
StateNameConstants.KAFKA_OFFSET_STATE_NAME,
         KafkaStateUtils.createTypeInformation(),
         KafkaStateUtils.createStateDescriptorSerializer(env.getConfig()));
         logger.info("Print kafka offset");
         kafkaListState.print();
        BootstrapTransformation<Tuple2<KafkaTopicPartition, Long>> 
kafkaTransformation = 
OperatorTransformation.bootstrapWith(kafkaListState).transform(new 
KafkaSourceBootstrapFunction());

         Savepoint.create(stateBackend, maxParallelism)
             .withOperator(OperatorUidAndNameConstants.KAFKA_SOURCE_UID, 
kafkaTransformation)
             .write(savepointOutputPath);
     } catch (IOException e) {
          logger.error("Savepoint load: " + e.getMessage());
          e.printStackTrace();
    } catch (Exception e) {
         logger.error("print state: " + e.getMessage());
         e.printStackTrace();
    }
 }

 

// KafkaStateUtils.java

public class KafkaStateUtils {
    /**
     * Creates state serializer for kafka topic partition to offset tuple.
     * Using of the explicit state serializer with KryoSerializer is needed 
because otherwise
     * users cannot use 'disableGenericTypes' properties with KafkaConsumer.
     * @param executionConfig
     * @return
     */
     public static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> 
createStateDescriptorSerializer(
               ExecutionConfig executionConfig) {
         // explicit serializer will keep the compatibility with 
GenericTypeInformation
         // and allow to disableGenericTypes for users
         TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[]{
                 new KryoSerializer<>(KafkaTopicPartition.class, 
executionConfig),
                 LongSerializer.INSTANCE
         };
        @SuppressWarnings("unchecked")
        Class<Tuple2<KafkaTopicPartition, Long>> tupleClass = 
(Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
        return new TupleSerializer<>(tupleClass, fieldSerializers);
     }


     public static TypeInformation<Tuple2<KafkaTopicPartition, Long>> 
createTypeInformation() {
         return TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, 
Long>>() {});
     }
}


// KafkaSourceBootstrapFunction.java
public class KafkaSourceBootstrapFunction extends 
StateBootstrapFunction<Tuple2<KafkaTopicPartition, Long>> {
           private ListState<Tuple2<KafkaTopicPartition, Long>> 
unionOffsetStates;
          @Override
          public void processElement(Tuple2<KafkaTopicPartition, Long> value, 
Context context) throws Exception {
              unionOffsetStates.add(value); 
          }


         @Override
        public void snapshotState(FunctionSnapshotContext 
functionSnapshotContext) throws Exception { }
        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
            unionOffsetStates = 
context.getOperatorStateStore().getUnionListState(
                   new ListStateDescriptor<Tuple2<KafkaTopicPartition, 
Long>>(StateNameConstants.KAFKA_OFFSET_STATE_NAME,
                              
KafkaStateUtils.createStateDescriptorSerializer(getRuntimeContext().getExecutionConfig())));
        }
}


以上是我的使用方式,请问是否可以有人给出一点使用建议?远程debug后发现只有`org.apache.flink.state.api.output.TaggedOperatorSubtaskState`没有从新的savepoint中反序列化出来。


Best regard!
clare



回复