Hi Hunag, It says checkpoint *Expired *with following Log:
2022-03-16 03:03:22,641 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - Shutting down the shard consumer threads of subtask 13 ... 2022-03-16 03:03:22,641 WARN *org.apache.hadoop.fs.s3a.S3ABlockOutputStream - Interrupted object upload* java.lang.InterruptedException at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) at java.util.concurrent.FutureTask.get(FutureTask.java:191) at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.putObject(S3ABlockOutputStream.java:441) at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:360) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) at org.apache.flink.fs.s3hadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:306) at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.fail(ChannelStateCheckpointWriter.java:225) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$abort$4(ChannelStateWriteRequest.java:89) at org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:176) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:73) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:52) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:94) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:74) at java.lang.Thread.run(Thread.java:750) 2022-03-16 03:03:22,641 WARN org.apache.hadoop.fs.s3a.S3AInstrumentation - Closing output stream statistics while data is still marked as pending upload in OutputStreamStatistics{blocksSubmitted=1, blocksInQueue=1, blocksActive=0, blockUploadsCompleted=0, blockUploadsFailed=0, bytesPendingUpload=1107015, bytesUploaded=0, blocksAllocated=1, blocksReleased=1, blocksActivelyAllocated=0, exceptionsInMultipartFinalize=0, transferDuration=0 ms, queueDuration=0 ms, averageQueueTime=0 ms, totalUploadDuration=0 ms, effectiveBandwidth=0.0 bytes/s} 2022-03-16 03:03:22,641 WARN org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Cannot delete closed and discarded state stream for s3://aeg-prod-bigdatadl-meta/flink/checkpoint/ams/5c658fc5f325f40baf063a78a20b1bb2/chk-535996/138d89f7-1514-42eb-b4bd-5e89d87c5a02 . java.io.InterruptedIOException: getFileStatus on s3://aeg-prod-bigdatadl-meta/flink/checkpoint/ams/5c658fc5f325f40baf063a78a20b1bb2/chk-535996/138d89f7-1514-42eb-b4bd-5e89d87c5a02: com.amazonaws.AbortedException: at org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2187) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.delete(HadoopFileSystem.java:147) at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:107) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:311) at org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.fail(ChannelStateCheckpointWriter.java:225) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$abort$4(ChannelStateWriteRequest.java:89) at org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:176) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:73) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:52) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:94) at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:74) at java.lang.Thread.run(Thread.java:750) Caused by: com.amazonaws.AbortedException: at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:862) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:740) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$4(S3AFileSystem.java:1235) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:280) at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1232) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2169) ... 14 more Caused by: com.amazonaws.http.timers.client.SdkInterruptedException at com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:917) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:903) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1097) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738) On Thu, Mar 17, 2022 at 9:11 AM yu'an huang <h.yuan...@gmail.com> wrote: > Hi, so the problem is about checkpoints. We need to understand why there > are checkpoint failure. Can you provide more logs. We need to check the log > to see more details about the first failed checkpoint. > > On 17 Mar 2022, at 9:41 AM, Vijayendra Yadav <contact....@gmail.com> > wrote: > > > Hi Flink Team, > > I am using Flink 1.11 with kinsisesis consumer and s3 file streaming write > with s3 checkpoint backend. This is streaming service. > > Usually a couple of checkpoints fails but no issues, After a week or so > of running checkpoint failures becomes ir·re·cov·er·a·ble and although > the application keeps running but in bad state and data flow blocks. > > Refer Graph below: > <image.png> > > Flink Checkpoint configurations as below: > Note: Time units in Milliseconds > > flink.checkpoint.interval=10000 > flink.checkpoint.minPauseInterval=500 > flink.checkpoint.Timeout=10000 > flink.checkpoint.maxConcurrent=1 > flink.checkpoint.preferCheckPoint=true > > > kinesis.shard.getrecords.max=10000 > kinesis.shard.getrecords.interval=10000 > kinesis.initial.position=LATEST > > *EXCEPTION On Job:* > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable > failure threshold. > > at org.apache.flink.runtime.checkpoint.CheckpointFailureManager > .handleJobLevelCheckpointException(CheckpointFailureManager.java:66) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator > .abortPendingCheckpoint(CheckpointCoordinator.java:1626) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator > .abortPendingCheckpoint(CheckpointCoordinator.java:1603) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator > .access$600(CheckpointCoordinator.java:90) > at org.apache.flink.runtime.checkpoint. > CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java: > 1736) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: > 511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent. > ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201( > ScheduledThreadPoolExecutor.java:180) > at java.util.concurrent. > ScheduledThreadPoolExecutor$ScheduledFutureTask.run( > ScheduledThreadPoolExecutor.java:293) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > > > > > >