Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink 
then cause high backpressure to the upper operator. Then the barrier may take a 
long time to arrive to sink.
        Please check if it is the case you have met. 

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <agrawalga...@gmail.com> 写道:
> 
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass 
> through custom KeyedProcessFunction with RocksDB state store which final 
> creates another stream into Kafka. Current size of checkpoint is around 
> ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental 
> checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are 
> running this job on yarn with following parameters
> 
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
> 
> While trying to run savepoint on this job, it runs for ~10mins and then 
> throws following error. Looks like checkpoint default timeout of 10mins is 
> causing this. What is recommended way to run savepoint for such job? Should 
> we increase checkpoint default timeout of 10mins? Also currently our state 
> size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases 
> with that much of size? Also how much time savepoint is expected to take with 
> such state size and parallelism on Yarn? Any other recommendation would be of 
> great help.
> 
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
> 434398968e635a49329f59a019b41b6f failed.
>       at 
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at 
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: 
> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint 
> expired before completing
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: 
> Checkpoint expired before completing
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Reply via email to