Hi Kevin, Great! This gives us confidence to move forward with new releases. Please let me know if there’s anything I can do to help with checkpoint file-merging.
Best, Zakelly On Wed, Sep 17, 2025 at 8:50 PM Kevin Kim <kevin....@alumni.princeton.edu> wrote: > Hi Zakelly, > > On my side, we've pulled and deployed the fix from the release-1.20 > branch[1] in our production canary deployments, and we haven't seen any > NPEs for ~3 days now. > Thanks so much for the quick fix! > > Kevin > > On Wed, Sep 10, 2025 at 8:24 AM Zakelly Lan <zakelly....@gmail.com> wrote: > >> Hi Kevin, >> >> We’ve merged the fix into the 1.20 branch, and AFAIK, the community will >> kick off the next patch release (1.20.3) soon. In the meantime, if you’re >> interested, you may build from the release-1.20 branch[1] to verify the fix. >> >> >> Thank you again for reporting the issue and for your valuable feedback. >> >> [1] https://github.com/apache/flink/tree/release-1.20 >> >> Best, >> Zakelly >> >> On Wed, Sep 10, 2025 at 1:33 AM Kevin Kim <kevin....@alumni.princeton.edu> >> wrote: >> >>> Thank you! Looking forward to testing the fix on my side. >>> >>> Kevin >>> >>> On Tue, Sep 9, 2025 at 9:15 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Hi Kevin, >>>> >>>> We've already merged the fix to master, backports are on the way :) >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Tue, Sep 9, 2025 at 2:26 PM Kevin Kim < >>>> kevin....@alumni.princeton.edu> wrote: >>>> >>>>> Hi Zakelly, >>>>> >>>>> Yes, this job contains both operator state and keyed state. Happy to >>>>> provide more details as needed. >>>>> >>>>> Kevin >>>>> >>>>> On Mon, Sep 8, 2025 at 10:48 PM Zakelly Lan <zakelly....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Kevin, >>>>>> >>>>>> One more thing I want to make sure about the job. Is this a >>>>>> datastream job with operator state (not only keyed states after >>>>>> `keyby()`)? >>>>>> >>>>>> >>>>>> Thanks! >>>>>> >>>>>> Best, >>>>>> Zakelly >>>>>> >>>>>> On Fri, Sep 5, 2025 at 9:53 PM Kevin Kim < >>>>>> kevin....@alumni.princeton.edu> wrote: >>>>>> >>>>>>> Thanks so much! >>>>>>> >>>>>>> More context: this file merging feature is really promising for our >>>>>>> use case. Given the large state size and parallelism, we occasionally >>>>>>> run >>>>>>> into S3 rate limits at checkpoint/savepoint time. >>>>>>> >>>>>>> The few times I tried this file merging, it helped quite a bit, but >>>>>>> I've had to turn it off for now due to this NPE, which happens >>>>>>> occasionally >>>>>>> but not always >>>>>>> >>>>>>> On Fri, Sep 5, 2025, 1:58 AM Gabor Somogyi < >>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Guys, >>>>>>>> >>>>>>>> This sounds severe and I'm also taking a look... >>>>>>>> >>>>>>>> G >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 5, 2025 at 4:44 AM Zakelly Lan <zakelly....@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Kevin, >>>>>>>>> >>>>>>>>> Thanks for the details. It's really helpful. I'm trying to >>>>>>>>> reproduce this according to your setup. Will let you know if any >>>>>>>>> updates. >>>>>>>>> >>>>>>>>> I create a jira issue to track this: >>>>>>>>> https://issues.apache.org/jira/browse/FLINK-38327 >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Zakelly >>>>>>>>> >>>>>>>>> On Thu, Sep 4, 2025 at 7:42 PM Kevin Kim < >>>>>>>>> kevin....@alumni.princeton.edu> wrote: >>>>>>>>> >>>>>>>>>> Hi Zakelly, >>>>>>>>>> >>>>>>>>>> Thanks for the reply. >>>>>>>>>> >>>>>>>>>> This job is running on Kubernetes using the Apache Flink >>>>>>>>>> Kubernetes operator. This NullPointerException happened during a job >>>>>>>>>> restart after one of the TaskManagers restarted because the >>>>>>>>>> underlying node >>>>>>>>>> running the TaskManager pod was scaled down for maintenance. There >>>>>>>>>> was no >>>>>>>>>> rescaling or parallelism change. >>>>>>>>>> >>>>>>>>>> The job is quite large due to heavy input traffic + state size: >>>>>>>>>> 2100 parallelism >>>>>>>>>> taskmanager.numberOfTaskSlots: 14 (so 150 TaskManagers total) >>>>>>>>>> RocksDBStateBackend used for state management. >>>>>>>>>> Checkpoints/savepoints are written to S3 in AWS. >>>>>>>>>> According to the Flink Checkpoint UI, the full state size is >>>>>>>>>> ~600GB >>>>>>>>>> >>>>>>>>>> Please let me know if more details would be helpful. >>>>>>>>>> >>>>>>>>>> Best regards, >>>>>>>>>> Kevin >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Sep 4, 2025 at 6:16 AM Zakelly Lan <zakelly....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Kevin, >>>>>>>>>>> >>>>>>>>>>> Would you please provide more info about the setup? Is this a >>>>>>>>>>> failover or manual job restart with or without a change of >>>>>>>>>>> parallelism >>>>>>>>>>> (rescale)? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Zakelly >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Sep 3, 2025 at 2:43 AM Kevin Kim <kkim...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Has anyone seen this NullPointerException after enabling >>>>>>>>>>>> checkpoint file merging? I'm running a job with Flink 1.20.2 with >>>>>>>>>>>> these >>>>>>>>>>>> configs: >>>>>>>>>>>> >>>>>>>>>>>> execution.checkpointing.file-merging.enabled: true >>>>>>>>>>>> execution.checkpointing.file-merging.max-file-size: 32m >>>>>>>>>>>> execution.checkpointing.timeout: "10min" >>>>>>>>>>>> execution.checkpointing.tolerable-failed-checkpoints: 3 >>>>>>>>>>>> execution.checkpointing.min-pause: "2min" >>>>>>>>>>>> execution.checkpointing.interval: "2min" >>>>>>>>>>>> >>>>>>>>>>>> java.lang.NullPointerException >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.isManagedByFileMergingManager(FileMergingSnapshotManagerBase.java:927) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$11(FileMergingSnapshotManagerBase.java:866) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.lambda$restoreStateHandles$12(FileMergingSnapshotManagerBase.java:861) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:992) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>>>>>>>>>> at >>>>>>>>>>>> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBase.restoreStateHandles(FileMergingSnapshotManagerBase.java:858) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation.restore(SubtaskFileMergingManagerRestoreOperation.java:102) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.registerRestoredStateToFileMergingManager(StreamTaskStateInitializerImpl.java:353) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:275) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >>>>>>>>>>>> at java.base/java.lang.Thread.run(Thread.java:840) >>>>>>>>>>>> >>>>>>>>>>>