Hi Salva The root cause is that you did not figure out the difference between keyed state and operator state.
There is no ‘currentKey’ in operator state, which means PartitionableListState#clear() will clear the whole state. However, there is always a ‘currentKey’ in keyed state, which means ‘state#clear()’ would only remove the entry scoped to current runtime key. In your example code, the state to clear is a MapState (not a list state) and therefore must be a keyed state. If your job did not process any record, there would no ‘currentKey’ to be set [1] for that ‘modelsBytes’ state which lead to the NPE when calling ‘state#clear()’. Moreover, ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to snapshot and initialize for operator state. Last but not least, even you could ensure at least one record processed before calling ‘snapshotState’, it’s not clear for your program logic. You cannot control well which entry in you state would be cleared since you cannot control the current key which set via processing record. You could refer to TwoPhaseCommitSinkFunction [2] to figure out what state could be cleared during snapshotStaet. [1] https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java#L136 [2] https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L324 Best Yun Tang From: Congxian Qiu <qcx978132...@gmail.com> Date: Monday, December 2, 2019 at 10:41 AM To: Salva Alcántara <salcantara...@gmail.com> Cc: user <user@flink.apache.org> Subject: Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not? Hi From the exception `No key set. This method should not be called outside of a keyed context.` it means that the key current passed in is null. In my opinion, it's something wrong here if there will throw an exception when no data arrive. could you please share the whole stack and a minimal reproducible job for this issue? Best, Congxian Salva Alcántara <salcantara...@gmail.com<mailto:salcantara...@gmail.com>> 于2019年12月1日周日 下午3:01写道: Given: ```scala class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction] with CheckpointedFunction { // To hold loaded models @transient private var models: HashMap[(String, String), Model] = _ // For serialization purposes @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _ ... override def snapshotState(context: FunctionSnapshotContext): Unit = { modelsBytes.clear() // This raises an exception when there is no active key set for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } override def initializeState(context: FunctionInitializationContext): Unit = { modelsBytes = context.getKeyedStateStore.getMapState[String, String]( new MapStateDescriptor("modelsBytes", classOf[String], classOf[String]) ) if (context.isRestored) { // restore models from modelsBytes } } } ``` It happens that `modelsBytes.clear()` raises an exception when there is no active key. This happens when I start the application from scratch without any data on the input streams. So, when the time for a checkpoint comes, I get this error: `java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.` However, when the input stream contains data, checkpoints work just fine. I am a bit confused about this because `snapshotState` does not provide a keyed context (contrary to `processElement1` and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always since they're supposed to work only within a keyed context. Can anyone clarify if this is the expected behaviour actually?