Hello,

Happened to me too, here’s the JIRA ticket: 
https://issues.apache.org/jira/browse/FLINK-21752

Regards,
Alexis.

From: bastien dine <bastien.d...@gmail.com>
Sent: Mittwoch, 2. Februar 2022 16:01
To: user <user@flink.apache.org>
Subject: Pojo State Migration - NPE with field deletion

Hello,

I have some trouble restoring a state (pojo) after deleting a field
According to documentation, it should not be a problem with POJO :
"Fields can be removed. Once removed, the previous value for the removed field 
will be dropped in future checkpoints and savepoints."

Here is a short stack trace (full trace is below) :

Caused by: java.lang.NullPointerException
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:119)
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:184)
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:46)


After some debug, it seems that the deleted POJO field still has a field 
serializer in the corresponding object PojoSerializer "fieldSerializers" array
But it is not present in the "fields", where we have a gap of 1 index (for 
example 0-1-3-4)
So when serializer reach index 2 we got this NPE,

Why is the deleted field serializer still present ? this should have been 
dropped when resolving schema compatibility right ?
I can not find anything on that matter, could someone help me with it ?
Reproduced in flink 1.13 & 1.14, can not find any related JIRA too

Best Regards,
Bastien

Full stack trace :
2022-02-02 15:44:20
java.io<http://java.io>.IOException: Could not perform checkpoint 2737490 for 
operator OperatorXXX
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
    at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    at 
org.apache.flink.streaming.runtime.io<http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
complete snapshot 2737490 for operator OperatorXXX
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
    at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
    ... 22 more
Caused by: java.lang.NullPointerException
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:119)
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:184)
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
    at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:46)
    at 
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.<init>(RegisteredOperatorStateBackendMetaInfo.java:61)
    at 
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.deepCopy(RegisteredOperatorStateBackendMetaInfo.java:96)
    at 
org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:63)
    at 
org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:76)
    at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
    at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
    at 
org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
    at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:227)
    ... 33 more

Reply via email to