Using Flink 1.9.2, Java, FsStateBackend. I was getting com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException on a value() operation on a ValueState variable in a KeyedProcessFunction. The object stored in state contained 2 PriorityQueue fields and the error message indicated these were the culprits. I assumed I did not need the concurrent version (PriorityBlockingQueue) because it was keyed state so only one task could operate on the variable at a time. And I assumed that checkpointing would not access the variable while I was updating it because the checkpointing would not see what I was doing between the value() and update() Operations. Changing to PriorityBlockingQueue fixed the problem.
Given that, could it be that Kryo just had an easier time with the PriorityBlockingQueue underlying fields, or should do we always need to use concurrent versions of objects that are stored in state?