Hi Salva

The root cause is that you did not figure out the difference between keyed 
state and operator state.

There is no ‘currentKey’ in operator state, which means 
PartitionableListState#clear() will clear the whole state. However, there is 
always a ‘currentKey’ in keyed state, which means ‘state#clear()’ would only 
remove the entry scoped to current runtime key. In your example code, the state 
to clear is a MapState (not a list state) and therefore must be a keyed state. 
If your job did not process any record, there would no ‘currentKey’ to be set 
[1] for that ‘modelsBytes’ state which lead to the NPE when calling 
‘state#clear()’.

Moreover, ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to 
snapshot and initialize for operator state.

Last but not least, even you could ensure at least one record processed before 
calling ‘snapshotState’, it’s not clear for your program logic. You cannot 
control well which entry in you state would be cleared since you cannot control 
the current key which set via processing record.

You could refer to TwoPhaseCommitSinkFunction [2] to figure out what state 
could be cleared during snapshotStaet.

[1] 
https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java#L136
[2] 
https://github.com/apache/flink/blob/8c6cc4505a4c27daadb00cd94df8a7e955eb8d52/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java#L324

Best
Yun Tang


From: Congxian Qiu <qcx978132...@gmail.com>
Date: Monday, December 2, 2019 at 10:41 AM
To: Salva Alcántara <salcantara...@gmail.com>
Cc: user <user@flink.apache.org>
Subject: Re: Using MapState clear, put methods in snapshotState within 
KeyedCoProcessFunction, valid or not?

Hi

From the exception `No key set. This method should not be called outside of a 
keyed context.` it means that the key current passed in is null. In my opinion, 
it's something wrong here if there will throw an exception when no data arrive. 
could you please share the whole stack and a minimal reproducible job for this 
issue?

Best,
Congxian


Salva Alcántara <salcantara...@gmail.com<mailto:salcantara...@gmail.com>> 
于2019年12月1日周日 下午3:01写道:
Given:


```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, 
Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] 
= _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active key 
set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String], classOf[String])
    )

    if (context.isRestored) {
      // restore models from modelsBytes
    }
  }

}
```

It happens that `modelsBytes.clear()` raises an exception when there is no 
active key. This happens when I start the application from scratch without any 
data on the input streams. So, when the time for a checkpoint comes, I get this 
error:

`java.lang.NullPointerException: No key set. This method should not be called 
outside of a keyed context.`

However, when the input stream contains data, checkpoints work just fine. I am 
a bit confused about this because `snapshotState` does not provide a keyed 
context (contrary to `processElement1` and `processElement2`, where the current 
key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the 
calls to `clear` and `put` within `snapshotState` should fail always since 
they're supposed to work only within a keyed context. Can anyone clarify if 
this is the expected behaviour actually?

Reply via email to