Thanks for the JIRA ticket,
This is for sure pretty critical. The "workaround" is to not remove the field but I am not sure if this is acceptable :) I could work on that, but someone need to point out to me where to start, Do I work on the PojoSerializer, to make this case not throwing an exception ? Or do I try to find the root cause, namely why the field serializer of the deleted field is still present ? Regards, ------------------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mer. 2 févr. 2022 à 16:37, Alexis Sarda-Espinosa < alexis.sarda-espin...@microfocus.com> a écrit : > Hello, > > > > Happened to me too, here’s the JIRA ticket: > https://issues.apache.org/jira/browse/FLINK-21752 > > > > Regards, > > Alexis. > > > > *From:* bastien dine <bastien.d...@gmail.com> > *Sent:* Mittwoch, 2. Februar 2022 16:01 > *To:* user <user@flink.apache.org> > *Subject:* Pojo State Migration - NPE with field deletion > > > > Hello, > > > > I have some trouble restoring a state (pojo) after deleting a field > > According to documentation, it should not be a problem with POJO : > > *"**Fields can be removed. Once removed, the previous value for the > removed field will be dropped in future checkpoints and savepoints."* > > > > Here is a short stack trace (full trace is below) : > > > > Caused by: java.lang.NullPointerException > > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>( > PojoSerializer.java:119) > > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer > .duplicate(PojoSerializer.java:184) > > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer > .duplicate(PojoSerializer.java:56) > > at org.apache.flink.streaming.runtime.streamrecord. > StreamElementSerializer.duplicate(StreamElementSerializer.java:83) > > at org.apache.flink.streaming.runtime.streamrecord. > StreamElementSerializer.duplicate(StreamElementSerializer.java:46) > > > > > > After some debug, it seems that the deleted POJO field still has a field > serializer in the corresponding object PojoSerializer "fieldSerializers" > array > > But it is not present in the "fields", where we have a gap of 1 index (for > example 0-1-3-4) > > So when serializer reach index 2 we got this NPE, > > > > Why is the deleted field serializer still present ? this should have been > dropped when resolving schema compatibility right ? > > I can not find anything on that matter, could someone help me with it ? > > Reproduced in flink 1.13 & 1.14, can not find any related JIRA too > > > > Best Regards, > > Bastien > > > Full stack trace : > > 2022-02-02 15:44:20 > > java.io.IOException: Could not perform checkpoint 2737490 for operator > OperatorXXX > > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpointOnBarrier(StreamTask.java:1274) > > at org.apache.flink.streaming.runtime.io.checkpointing. > CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java: > 147) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.triggerCheckpoint( > SingleCheckpointBarrierHandler.java:287) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler > .java:64) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint( > SingleCheckpointBarrierHandler.java:493) > > at org.apache.flink.streaming.runtime.io.checkpointing. > AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint( > AbstractAlignedBarrierHandlerState.java:74) > > at org.apache.flink.streaming.runtime.io.checkpointing. > AbstractAlignedBarrierHandlerState.barrierReceived( > AbstractAlignedBarrierHandlerState.java:66) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.lambda$processBarrier$2( > SingleCheckpointBarrierHandler.java:234) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState( > SingleCheckpointBarrierHandler.java:262) > > at org.apache.flink.streaming.runtime.io.checkpointing. > SingleCheckpointBarrierHandler.processBarrier( > SingleCheckpointBarrierHandler.java:231) > > at org.apache.flink.streaming.runtime.io.checkpointing. > CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) > > at org.apache.flink.streaming.runtime.io.checkpointing. > CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) > > at org.apache.flink.streaming.runtime.io. > AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput > .java:110) > > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:65) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:496) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:203) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:809) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:761) > > at org.apache.flink.runtime.taskmanager.Task > .runWithSystemExitMonitoring(Task.java:958) > > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task > .java:937) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > > at java.base/java.lang.Thread.run(Unknown Source) > > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could > not complete snapshot 2737490 for operator OperatorXXX > > at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > .snapshotState(StreamOperatorStateHandler.java:265) > > at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > .snapshotState(StreamOperatorStateHandler.java:170) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .snapshotState(AbstractStreamOperator.java:348) > > at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain > .checkpointStreamOperator(RegularOperatorChain.java:233) > > at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain > .buildOperatorSnapshotFutures(RegularOperatorChain.java:206) > > at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain > .snapshotState(RegularOperatorChain.java:186) > > at org.apache.flink.streaming.runtime.tasks. > SubtaskCheckpointCoordinatorImpl.takeSnapshotSync( > SubtaskCheckpointCoordinatorImpl.java:605) > > at org.apache.flink.streaming.runtime.tasks. > SubtaskCheckpointCoordinatorImpl.checkpointState( > SubtaskCheckpointCoordinatorImpl.java:315) > > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$performCheckpoint$14(StreamTask.java:1329) > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .runThrowing(StreamTaskActionExecutor.java:50) > > at org.apache.flink.streaming.runtime.tasks.StreamTask > .performCheckpoint(StreamTask.java:1315) > > at org.apache.flink.streaming.runtime.tasks.StreamTask > .triggerCheckpointOnBarrier(StreamTask.java:1258) > > ... 22 more > > Caused by: java.lang.NullPointerException > > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>( > PojoSerializer.java:119) > > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer > .duplicate(PojoSerializer.java:184) > > at org.apache.flink.api.java.typeutils.runtime.PojoSerializer > .duplicate(PojoSerializer.java:56) > > at org.apache.flink.streaming.runtime.streamrecord. > StreamElementSerializer.duplicate(StreamElementSerializer.java:83) > > at org.apache.flink.streaming.runtime.streamrecord. > StreamElementSerializer.duplicate(StreamElementSerializer.java:46) > > at org.apache.flink.runtime.state. > RegisteredOperatorStateBackendMetaInfo.<init>( > RegisteredOperatorStateBackendMetaInfo.java:61) > > at org.apache.flink.runtime.state. > RegisteredOperatorStateBackendMetaInfo.deepCopy( > RegisteredOperatorStateBackendMetaInfo.java:96) > > at org.apache.flink.runtime.state.PartitionableListState.<init>( > PartitionableListState.java:63) > > at org.apache.flink.runtime.state.PartitionableListState.deepCopy( > PartitionableListState.java:76) > > at org.apache.flink.runtime.state. > DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources( > DefaultOperatorStateBackendSnapshotStrategy.java:77) > > at org.apache.flink.runtime.state. > DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources( > DefaultOperatorStateBackendSnapshotStrategy.java:36) > > at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot( > SnapshotStrategyRunner.java:77) > > at org.apache.flink.runtime.state.DefaultOperatorStateBackend > .snapshot(DefaultOperatorStateBackend.java:230) > > at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler > .snapshotState(StreamOperatorStateHandler.java:227) > > ... 33 more > > >