Hi, So we are running a massively scaled OTT analytics platform. We have 10s of millions of users watching OTT content. So we have decided to have 120 partitions for our Kafka topic to receive this data, hence we run with a parallelism of 120. This is the motivation of running with a parallelism of 120. To efficiently process data by scaling horizontally. >From the Flink UI console I can see it has no problems creating 120 parallel subtasks with each task manager having 4 slots, so 30 task managers in total.
As per what I have understood from you is that, you have suggested that many times changelog file upload to s3 takes long time hence you are suggesting to increase: dstl.dfs.upload.timeout: from default of 1 sec to a higher value. Is this correct ? Also I have seen that the entire checkpointing is done within a few seconds, so do I still need to increase the checkpointing timeout to more than 15/10 minutes ? Next time problem surfaces, I would enable the profiler to see what is making the CPU congested during restarts. However from Flink metrics I see busyTimeMsPerSecond to be fairly low for all my tasks. Also as per Yarn documentation for AWS EMR, local recovery is used and I see restarts time much lower since I have enabled this configuration. Also let me know if increasing these heartbeat related configurations would fix the problem ? heartbeat.rpc-failure-threshold: 10 heartbeat.timeout: 100000 Also I have not understood what you mean by: You give RocksDB roughly an overall size of ~300Gb ? Since statebackend is stored in s3, it should have all the size it needs. I have allocated 100 GB of local disk space for each task manager and I see disk utilization of only 25%. Also I did not understand your this comment: - Assuming that you didn’t augment the default value for max parallelism 256, a parallelism of 120 means that 90% of your subtasks process 2 key-slots each, whereas 10% process 3 key-slots. That leads to a processing bias of +50% (3/2) Which exact flink or autoscaler config you are mentioning here for default value of max parallelism? I don't think I have changed anything here and am running with defaults. Finally, since I stay in India, I will not be able to attend the FlinkForward Conference in Barcelona. Thanks Sachin On Mon, Sep 22, 2025 at 1:03 PM Schwalbe Matthias < [email protected]> wrote: > Hi Sachin, > > > > A couple of observations which might lead to improve your situation: > > > > - The first timeout/exception happens within the change log writer > which is set to only one second > - I/O jitter towards the DFS can easily lead to occasional > responses that take more than one second > - This is up to the point that for our jobs on Kubernetes we cannot > afford to enable the change log feature > - Moderately augmenting the timeout configuration might improve > your situation, however, change log is probably quite sensitive to this > - Your checkpoint interval is set so 15 minutes, I would try to > increase the checkpoint timeout at least a little bit > - When restarting, your task manager CPU is congested up to the point > that pekko receiver loop fails to give timely responses > - Did you take profiles on the Profiler tab of your task manager? > That usually gives me best information on bottlenecks that are not on > task > level (needs to be enabled in config) > - Unfortunately, when loading a checkpoint/savepoint from DFS, it > takes long time. I see a lot of unbuffered I/O towards the DFS during that > phase. However that is difficult to fix without a Flink fork. > - In your configuration I see local recovery, is that actually used > on the restarted task managers on Yarn? (you probably checked that > 😊 ) > - You give RocksDB roughly an overall size of ~300Gb, when RocksDB > data is likely around 50Gb > - What is the motivation for going with parallelism 120, > - Is it based (bottlenecked) on RocksDB I/O, or CPU capacity? > - Assuming that you didn’t augment the default value for max > parallelism 256, a parallelism of 120 means that 90% of your subtasks > process 2 key-slots each, whereas 10% process 3 key-slots. That leads > to a > processing bias of +50% (3/2) > > > > So much for now, I hope that helps. > > Will you be at the FlinkForward Conference in Barcelona? There we could > also exchange experience in person … > > > > Thias > > > > > > > > > > *From:* Sachin Mittal <[email protected]> > *Sent:* Saturday, September 20, 2025 11:41 AM > *To:* user <[email protected]> > *Subject:* [External] A random checkpoint failure creates an avalanche of > restarts > > > > ⚠*EXTERNAL MESSAGE – CAUTION: Think Before You Click *⚠ > > > > Hi, > > So we are running Flink 19.1 on AWS EMR using Yarn as resource manager. > > We have a fairly large cluster with 120 parallelism and 30 task managers > running with 4 task slots. > > > > Here are some of the important configs: > > taskmanager.numberOfTaskSlots: 4 > > jobmanager.memory.process.size: 12g > > taskmanager.memory.process.size: 24g > > taskmanager.memory.task.off-heap.size: 1g > > taskmanager.memory.managed.fraction: 0.5 > > taskmanager.memory.network.fraction: 0.05 > > taskmanager.memory.jvm-overhead.fraction: 0.05 > > state.backend.type: rocksdb > > state.checkpoints.dir: s3://bucket/flink-checkpoints > > state.backend.incremental: 'true' > > state.backend.local-recovery: 'true' > > state.backend.changelog.enabled: 'true' > > state.backend.changelog.storage: filesystem > > dstl.dfs.base-path: s3://bucket/changelog > > dstl.dfs.compression.enabled: 'true' > > > > My job checkpoint configs are: > > env.enableCheckpointing(900000L); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.*EXACTLY_ONCE*); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(900000L); > env.getCheckpointConfig().setCheckpointTimeout(900000L); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.*MAX_VALUE*); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.getCheckpointConfig().enableUnalignedCheckpoints(); > > Also checkpoint full size can be very large upto 50 GB. > > Now it was running fine for a long time but randomly we got the following > exception: > > > > java.io.IOException: Could not perform checkpoint 70 for operator ... (80/ > 120)#0. > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java: > 1326) > > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java: > 147) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java: > 287) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$ > 100(SingleCheckpointBarrierHandler.java:64) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java: > 488) > > at > org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java: > 78) > > at > org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java: > 56) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$ > 2(SingleCheckpointBarrierHandler.java:234) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java: > 262) > > at > org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java: > 231) > > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java: > 181) > > at > org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java: > 159) > > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java: > 122) > > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java: > 65) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java: > 579) > > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java: > 231) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java: > 909) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java: > 858) > > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java: > 958) > > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: > 937) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > > at java.base/java.lang.Thread.run(Thread.java:840) > > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could > not complete snapshot 70 for operator ... (80/120)#0. Failure reason: > Checkpoint was declined. > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: > 281) > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: > 185) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java: > 348) > > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java: > 228) > > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java: > 213) > > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java: > 192) > > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java: > 720) > > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java: > 352) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$ > 16(StreamTask.java:1369) > > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .runThrowing(StreamTaskActionExecutor.java:50) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java: > 1357) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java: > 1314) > > ... 22 more > > Caused by: java.io.IOException: The upload for 1439 has already failed > previously > > at > org.apache.flink.changelog.fs.FsStateChangelogWriter.ensureCanPersist(FsStateChangelogWriter.java: > 463) > > at > org.apache.flink.changelog.fs.FsStateChangelogWriter.persistInternal(FsStateChangelogWriter.java: > 234) > > at > org.apache.flink.changelog.fs.FsStateChangelogWriter.persist(FsStateChangelogWriter.java: > 217) > > at > org.apache.flink.state.changelog.ChangelogKeyedStateBackend.snapshot(ChangelogKeyedStateBackend.java: > 406) > > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: > 258) > > ... 33 more > > Caused by: java.util.concurrent.TimeoutException: Attempt 3 timed out > after 1000ms > > at > org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.fmtError(RetryingExecutor.java: > 319) > > at > org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.lambda$scheduleTimeout$ > 1(RetryingExecutor.java:314) > > > > After this I start getting many timeout exceptions where randomly one or > other task manager becomes unreachable and restarts start happening very > frequently. Here are few of these stack traces: > > > > > > java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id > container_... timed out. > > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java: > 1550) > > at > org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.run(DefaultHeartbeatMonitor.java: > 158) > > ... > > > > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connection unexpectedly closed by remote task manager '...'. This might > indicate that the remote task manager was lost. > > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java: > 134) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java: > 305) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java: > 281) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java: > 274) > > ... > > > > org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id > ... is no longer reachable. > > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java: > 1566) > > at > org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java: > 126) > > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java: > 275) > > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java: > 267) > > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java: > 262) > > at > org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$ > 0(HeartbeatManagerImpl.java:248) > > ... > > > > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Lost connection to task manager '...'. This indicates that the remote task > manager was lost. > > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java: > 165) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java: > 346) > > at org.apache.flink.shaded.netty4 > .io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java: > 325) > > ... > > Caused by: > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > recvAddress(..) failed: Connection reset by peer > > > > > > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >
