Hi Kevin,

Great! This gives us confidence to move forward with new releases.
Please let me know if there’s anything I can do to help with checkpoint
file-merging.


Best,
Zakelly


On Wed, Sep 17, 2025 at 8:50 PM Kevin Kim <kevin....@alumni.princeton.edu>
wrote:

> Hi Zakelly,
>
> On my side, we've pulled and deployed the fix from the release-1.20
> branch[1] in our production canary deployments, and we haven't seen any
> NPEs for ~3 days now.
> Thanks so much for the quick fix!
>
> Kevin
>
> On Wed, Sep 10, 2025 at 8:24 AM Zakelly Lan <zakelly....@gmail.com> wrote:
>
>> Hi Kevin,
>>
>> We’ve merged the fix into the 1.20 branch, and AFAIK, the community will
>> kick off the next patch release (1.20.3) soon. In the meantime, if you’re
>> interested, you may build from the release-1.20 branch[1] to verify the fix.
>>
>>
>> Thank you again for reporting the issue and for your valuable feedback.
>>
>> [1] https://github.com/apache/flink/tree/release-1.20
>>
>> Best,
>> Zakelly
>>
>> On Wed, Sep 10, 2025 at 1:33 AM Kevin Kim <kevin....@alumni.princeton.edu>
>> wrote:
>>
>>> Thank you! Looking forward to testing the fix on my side.
>>>
>>> Kevin
>>>
>>> On Tue, Sep 9, 2025 at 9:15 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Kevin,
>>>>
>>>> We've already merged the fix to master, backports are on the way :)
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Tue, Sep 9, 2025 at 2:26 PM Kevin Kim <
>>>> kevin....@alumni.princeton.edu> wrote:
>>>>
>>>>> Hi Zakelly,
>>>>>
>>>>> Yes, this job contains both operator state and keyed state. Happy to
>>>>> provide more details as needed.
>>>>>
>>>>> Kevin
>>>>>
>>>>> On Mon, Sep 8, 2025 at 10:48 PM Zakelly Lan <zakelly....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Kevin,
>>>>>>
>>>>>> One more thing I want to make sure about the job. Is this a
>>>>>> datastream job with operator state (not only keyed states after 
>>>>>> `keyby()`)?
>>>>>>
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Best,
>>>>>> Zakelly
>>>>>>
>>>>>> On Fri, Sep 5, 2025 at 9:53 PM Kevin Kim <
>>>>>> kevin....@alumni.princeton.edu> wrote:
>>>>>>
>>>>>>> Thanks so much!
>>>>>>>
>>>>>>> More context: this file merging feature is really promising for our
>>>>>>> use case. Given the large state size and parallelism, we occasionally 
>>>>>>> run
>>>>>>> into S3 rate limits at checkpoint/savepoint time.
>>>>>>>
>>>>>>> The few times I tried this file merging, it helped quite a bit, but
>>>>>>> I've had to turn it off for now due to this NPE, which happens 
>>>>>>> occasionally
>>>>>>> but not always
>>>>>>>
>>>>>>> On Fri, Sep 5, 2025, 1:58 AM Gabor Somogyi <
>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Guys,
>>>>>>>>
>>>>>>>> This sounds severe and I'm also taking a look...
>>>>>>>>
>>>>>>>> G
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 5, 2025 at 4:44 AM Zakelly Lan <zakelly....@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Kevin,
>>>>>>>>>
>>>>>>>>> Thanks for the details. It's really helpful. I'm trying to
>>>>>>>>> reproduce this according to your setup. Will let you know if any 
>>>>>>>>> updates.
>>>>>>>>>
>>>>>>>>> I create a jira issue to track this:
>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-38327
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Zakelly
>>>>>>>>>
>>>>>>>>> On Thu, Sep 4, 2025 at 7:42 PM Kevin Kim <
>>>>>>>>> kevin....@alumni.princeton.edu> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Zakelly,
>>>>>>>>>>
>>>>>>>>>> Thanks for the reply.
>>>>>>>>>>
>>>>>>>>>> This job is running on Kubernetes using the Apache Flink
>>>>>>>>>> Kubernetes operator. This NullPointerException happened during a job
>>>>>>>>>> restart after one of the TaskManagers restarted because the 
>>>>>>>>>> underlying node
>>>>>>>>>> running the TaskManager pod was scaled down for maintenance. There 
>>>>>>>>>> was no
>>>>>>>>>> rescaling or parallelism change.
>>>>>>>>>>
>>>>>>>>>> The job is quite large due to heavy input traffic + state size:
>>>>>>>>>> 2100 parallelism
>>>>>>>>>> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total)
>>>>>>>>>> RocksDBStateBackend used for state management.
>>>>>>>>>> Checkpoints/savepoints are written to S3 in AWS.
>>>>>>>>>> According to the Flink Checkpoint UI, the full state size is
>>>>>>>>>> ~600GB
>>>>>>>>>>
>>>>>>>>>> Please let me know if more details would be helpful.
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>> Kevin
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <zakelly....@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Kevin,
>>>>>>>>>>>
>>>>>>>>>>> Would you please provide more info about the setup? Is this a
>>>>>>>>>>> failover or manual job restart with or without a change of 
>>>>>>>>>>> parallelism
>>>>>>>>>>> (rescale)?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Zakelly
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <kkim...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Has anyone seen this NullPointerException after enabling
>>>>>>>>>>>> checkpoint file merging? I'm running a job with Flink 1.20.2 with 
>>>>>>>>>>>> these
>>>>>>>>>>>> configs:
>>>>>>>>>>>>
>>>>>>>>>>>> execution.checkpointing.file-merging.enabled: true
>>>>>>>>>>>> execution.checkpointing.file-merging.max-file-size: 32m
>>>>>>>>>>>> execution.checkpointing.timeout: "10min"
>>>>>>>>>>>> execution.checkpointing.tolerable-failed-checkpoints: 3
>>>>>>>>>>>> execution.checkpointing.min-pause: "2min"
>>>>>>>>>>>> execution.checkpointing.interval: "2min"
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>>>>>>>>>> at
>>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>>>>>>>>>> at java.base/java.lang.Thread.run(Thread.java:840)
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to