Re: Pojo State Migration - NPE with field deletion

2022-02-03 Thread bastien dine
Thanks for the JIRA ticket,


This is for sure pretty critical.
The "workaround" is to not remove the field but I am not sure if this is
acceptable :)

I could work on that, but someone need to point out to me where to start,

Do I work on the PojoSerializer, to make this case not throwing an
exception ?

Or do I try to find the root cause, namely why the field serializer of the
deleted field is still present ?

Regards,
--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 2 févr. 2022 à 16:37, Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> a écrit :

> Hello,
>
>
>
> Happened to me too, here’s the JIRA ticket:
> https://issues.apache.org/jira/browse/FLINK-21752
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* bastien dine 
> *Sent:* Mittwoch, 2. Februar 2022 16:01
> *To:* user 
> *Subject:* Pojo State Migration - NPE with field deletion
>
>
>
> Hello,
>
>
>
> I have some trouble restoring a state (pojo) after deleting a field
>
> According to documentation, it should not be a problem with POJO :
>
> *"**Fields can be removed. Once removed, the previous value for the
> removed field will be dropped in future checkpoints and savepoints."*
>
>
>
> Here is a short stack trace (full trace is below) :
>
>
>
> Caused by: java.lang.NullPointerException
>
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(
> PojoSerializer.java:119)
>
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:184)
>
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .duplicate(PojoSerializer.java:56)
>
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
>
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.duplicate(StreamElementSerializer.java:46)
>
>
>
>
>
> After some debug, it seems that the deleted POJO field still has a field
> serializer in the corresponding object PojoSerializer "fieldSerializers"
> array
>
> But it is not present in the "fields", where we have a gap of 1 index (for
> example 0-1-3-4)
>
> So when serializer reach index 2 we got this NPE,
>
>
>
> Why is the deleted field serializer still present ? this should have been
> dropped when resolving schema compatibility right ?
>
> I can not find anything on that matter, could someone help me with it ?
>
> Reproduced in flink 1.13 & 1.14, can not find any related JIRA too
>
>
>
> Best Regards,
>
> Bastien
>
>
> Full stack trace :
>
> 2022-02-02 15:44:20
>
> java.io.IOException: Could not perform checkpoint 2737490 for operator
> OperatorXXX
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .triggerCheckpointOnBarrier(StreamTask.java:1274)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:
> 147)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.triggerCheckpoint(
> SingleCheckpointBarrierHandler.java:287)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler
> .java:64)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(
> SingleCheckpointBarrierHandler.java:493)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(
> AbstractAlignedBarrierHandlerState.java:74)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> AbstractAlignedBarrierHandlerState.barrierReceived(
> AbstractAlignedBarrierHandlerState.java:66)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.lambda$processBarrier$2(
> SingleCheckpointBarrierHandler.java:234)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(
> SingleCheckpointBarrierHandler.java:262)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> SingleCheckpointBarrierHandler.processBarrier(
> SingleCheckpointBarrierHandler.java:231)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>
> at org.apache.flink.streaming.runtime.io.checkpointing.
> CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>
> at org.apache.flink.streaming.runtime.io.
> AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput
> .java:110)
>
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:496)
>
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:

RE: Pojo State Migration - NPE with field deletion

2022-02-02 Thread Alexis Sarda-Espinosa
Hello,

Happened to me too, here’s the JIRA ticket: 
https://issues.apache.org/jira/browse/FLINK-21752

Regards,
Alexis.

From: bastien dine 
Sent: Mittwoch, 2. Februar 2022 16:01
To: user 
Subject: Pojo State Migration - NPE with field deletion

Hello,

I have some trouble restoring a state (pojo) after deleting a field
According to documentation, it should not be a problem with POJO :
"Fields can be removed. Once removed, the previous value for the removed field 
will be dropped in future checkpoints and savepoints."

Here is a short stack trace (full trace is below) :

Caused by: java.lang.NullPointerException
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:119)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:184)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:83)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.duplicate(StreamElementSerializer.java:46)


After some debug, it seems that the deleted POJO field still has a field 
serializer in the corresponding object PojoSerializer "fieldSerializers" array
But it is not present in the "fields", where we have a gap of 1 index (for 
example 0-1-3-4)
So when serializer reach index 2 we got this NPE,

Why is the deleted field serializer still present ? this should have been 
dropped when resolving schema compatibility right ?
I can not find anything on that matter, could someone help me with it ?
Reproduced in flink 1.13 & 1.14, can not find any related JIRA too

Best Regards,
Bastien

Full stack trace :
2022-02-02 15:44:20
java.io.IOException: Could not perform checkpoint 2737490 for 
operator OperatorXXX
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
complete snapshot 2737490 for operator OperatorXXX
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
at