Thanks for the JIRA ticket,

This is for sure pretty critical.
The "workaround" is to not remove the field but I am not sure if this is
acceptable :)

I could work on that, but someone need to point out to me where to start,

Do I work on the PojoSerializer, to make this case not throwing an
exception ?

Or do I try to find the root cause, namely why the field serializer of the
deleted field is still present ?

Regards,
------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 2 févr. 2022 à 16:37, Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> a écrit :

> Hello,
>
>
>
> Happened to me too, here’s the JIRA ticket:
> https://issues.apache.org/jira/browse/FLINK-21752
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* bastien dine <bastien.d...@gmail.com>
> *Sent:* Mittwoch, 2. Februar 2022 16:01
> *To:* user <user@flink.apache.org>
> *Subject:* Pojo State Migration - NPE with field deletion
>
>
>
> Hello,
>
>
>
> I have some trouble restoring a state (pojo) after deleting a field
>
> According to documentation, it should not be a problem with POJO :
>
> *"**Fields can be removed. Once removed, the previous value for the
> removed field will be dropped in future checkpoints and savepoints."*
>
>
>
> Here is a short stack trace (full trace is below) :
>
>
>
> Caused by: java.lang.NullPointerException
>
>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(
> PojoSerializer.java:119)
>
>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:184)
>
>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:56)
>
>     at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
>
>     at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:46)
>
>
>
>
>
> After some debug, it seems that the deleted POJO field still has a field
> serializer in the corresponding object PojoSerializer "fieldSerializers"
> array
>
> But it is not present in the "fields", where we have a gap of 1 index (for
> example 0-1-3-4)
>
> So when serializer reach index 2 we got this NPE,
>
>
>
> Why is the deleted field serializer still present ? this should have been
> dropped when resolving schema compatibility right ?
>
> I can not find anything on that matter, could someone help me with it ?
>
> Reproduced in flink 1.13 & 1.14, can not find any related JIRA too
>
>
>
> Best Regards,
>
> Bastien
>
>
> Full stack trace :
>
> 2022-02-02 15:44:20
>
> java.io.IOException: Could not perform checkpoint 2737490 for operator
> OperatorXXX
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpointOnBarrier(StreamTask.java:1274)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:
> 147)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.triggerCheckpoint(
> SingleCheckpointBarrierHandler.java:287)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler
> .java:64)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(
> SingleCheckpointBarrierHandler.java:493)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(
> AbstractAlignedBarrierHandlerState.java:74)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> AbstractAlignedBarrierHandlerState.barrierReceived(
> AbstractAlignedBarrierHandlerState.java:66)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.lambda$processBarrier$2(
> SingleCheckpointBarrierHandler.java:234)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(
> SingleCheckpointBarrierHandler.java:262)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.processBarrier(
> SingleCheckpointBarrierHandler.java:231)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>
>     at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>
>     at org.apache.flink.streaming.runtime.io.
> AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput
> .java:110)
>
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:496)
>
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:203)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:809)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:761)
>
>     at org.apache.flink.runtime.taskmanager.Task
> .runWithSystemExitMonitoring(Task.java:958)
>
>     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task
> .java:937)
>
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>
>     at java.base/java.lang.Thread.run(Unknown Source)
>
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
> not complete snapshot 2737490 for operator OperatorXXX
>
>     at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> .snapshotState(StreamOperatorStateHandler.java:265)
>
>     at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> .snapshotState(StreamOperatorStateHandler.java:170)
>
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .snapshotState(AbstractStreamOperator.java:348)
>
>     at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
> .checkpointStreamOperator(RegularOperatorChain.java:233)
>
>     at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
> .buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
>
>     at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
> .snapshotState(RegularOperatorChain.java:186)
>
>     at org.apache.flink.streaming.runtime.tasks.
> SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(
> SubtaskCheckpointCoordinatorImpl.java:605)
>
>     at org.apache.flink.streaming.runtime.tasks.
> SubtaskCheckpointCoordinatorImpl.checkpointState(
> SubtaskCheckpointCoordinatorImpl.java:315)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$performCheckpoint$14(StreamTask.java:1329)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:50)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .performCheckpoint(StreamTask.java:1315)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpointOnBarrier(StreamTask.java:1258)
>
>     ... 22 more
>
> Caused by: java.lang.NullPointerException
>
>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(
> PojoSerializer.java:119)
>
>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:184)
>
>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:56)
>
>     at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
>
>     at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:46)
>
>     at org.apache.flink.runtime.state.
> RegisteredOperatorStateBackendMetaInfo.<init>(
> RegisteredOperatorStateBackendMetaInfo.java:61)
>
>     at org.apache.flink.runtime.state.
> RegisteredOperatorStateBackendMetaInfo.deepCopy(
> RegisteredOperatorStateBackendMetaInfo.java:96)
>
>     at org.apache.flink.runtime.state.PartitionableListState.<init>(
> PartitionableListState.java:63)
>
>     at org.apache.flink.runtime.state.PartitionableListState.deepCopy(
> PartitionableListState.java:76)
>
>     at org.apache.flink.runtime.state.
> DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(
> DefaultOperatorStateBackendSnapshotStrategy.java:77)
>
>     at org.apache.flink.runtime.state.
> DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(
> DefaultOperatorStateBackendSnapshotStrategy.java:36)
>
>     at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(
> SnapshotStrategyRunner.java:77)
>
>     at org.apache.flink.runtime.state.DefaultOperatorStateBackend
> .snapshot(DefaultOperatorStateBackend.java:230)
>
>     at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> .snapshotState(StreamOperatorStateHandler.java:227)
>
>     ... 33 more
>
>
>

Reply via email to