He folks, This is a crosspost of a stack overflow question (https://stackoverflow.com/questions/68631624/flink-job-cant-use-savepoint-in-a-batch-job) which didn’t get any replies yet so please bare with me.
Let me start in a generic fashion to see if I somehow missed some concepts: I have a streaming flink job from which I created a savepoint and try to reuse that save point in the same job running in batch-mode. Simplified version of this job looks like this Pseduo-Code: val flink = StreamExecutionEnvironment.getExecutionEnvironment val stream = if (batchMode) { flink.readFile(path) } else { flink.addKafkaSource(topicName) } stream.keyBy(key) stream.process(new ProcessorWithKeyedState()) CassandraSink.addSink(stream) This works fine as long as I run the job without a savepoint. If I start the job from a savepoint I get an exception which looks like this Caused by: java.lang.UnsupportedOperationException: Checkpoints are not supported in a single key state backend at org.apache.flink.streaming.api.operators.sorted.state.NonCheckpointingStorageAccess.resolveCheckpoint(NonCheckpointingStorageAccess.java:43) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1623) at org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249) I could work around this if I set the option: execution.batch-state-backend.enabled: false but this eventually results in another error: Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:673) at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653) at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:526) Of course I tried to set the config key taskmanager.memory.managed.consumer-weights (used DATAPROC:70,PYTHON:30) but this doesn't seems to have any effects. So I wonder if I have a conceptual error and can't reuse savepoints from a streaming job in a batch job or if I simply have a problem in my configuration. Any hints? Thanks in advance, Tobi