[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException
[ https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467013#comment-16467013 ] Stefan Richter commented on FLINK-9302: --- Thanks for the update, I appreciate it. This information can also be helpful to others that run into similar problems. > Checkpoints continues to fail when using filesystem state backend with > CIRCULAR REFERENCE:java.io.IOException > - > > Key: FLINK-9302 > URL: https://issues.apache.org/jira/browse/FLINK-9302 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > *state backend: filesystem* > *checkpoint.mode:EXACTLY_ONCE* > +dag:+ > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(EventTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(EventTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > * The job runs fine and checkpoints succeed for few hours. > * Later it fails because of the following checkpoint error. > * Once the job is recovered from the last successful checkpoint, it > continues to fail with the same checkpoint error. > * This persists until the job is restarted with no checkpoint state or using > the checkpoint previous to the last good one. > AsynchronousException\{java.lang.Exception: Could not materialize checkpoint > 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: > NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 42 for > operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, > makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed keyed > state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:385) > at >
[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException
[ https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466281#comment-16466281 ] Narayanan Arunachalam commented on FLINK-9302: -- Thanks [~srichter]. I ran some tests over the weekend and found that though the error is from S3, it was actually because of the size of the checkpoint. In one of my tests, after the job recovered from a last good checkpoint, the state size continued to grow at ~10G every 15 mins. This was because, too much data was read in to the pipeline with `maxOutOfOrderness` set to 60 secs. The windows won't fire soon enough resulting in large states. One option is to scale the cluster to deal with many open windows. But I realize `AscendingTimestampExtractor` might be enough for my use case and running some tests using this setting. In any case, this particular error is a side effect of the way my windows are setup and no direct evidence of any bug. Thought I will post this comment anyway to keep you all in sync. > Checkpoints continues to fail when using filesystem state backend with > CIRCULAR REFERENCE:java.io.IOException > - > > Key: FLINK-9302 > URL: https://issues.apache.org/jira/browse/FLINK-9302 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > *state backend: filesystem* > *checkpoint.mode:EXACTLY_ONCE* > +dag:+ > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(EventTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(EventTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > * The job runs fine and checkpoints succeed for few hours. > * Later it fails because of the following checkpoint error. > * Once the job is recovered from the last successful checkpoint, it > continues to fail with the same checkpoint error. > * This persists until the job is restarted with no checkpoint state or using > the checkpoint previous to the last good one. > AsynchronousException\{java.lang.Exception: Could not materialize checkpoint > 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: > NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 42 for > operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, > makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed keyed > state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at >
[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException
[ https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465575#comment-16465575 ] Stefan Richter commented on FLINK-9302: --- If you take a look at this stack trace, it tells you that the reason of the problem is related to the amazon sdk having troubles with the connection to S3 and therefore it could not complete writing the checkpoint. {{Caused by: com.amazonaws.SdkClientException: Unable to complete multi-part upload. Individual part upload failed : Unable to execute HTTP request: Broken pipe (Write failed)}} This does not look like a Flink problem, maybe a hickup with S3. There are also proposed solutions to this exception if you google it. > Checkpoints continues to fail when using filesystem state backend with > CIRCULAR REFERENCE:java.io.IOException > - > > Key: FLINK-9302 > URL: https://issues.apache.org/jira/browse/FLINK-9302 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.2 >Reporter: Narayanan Arunachalam >Priority: Major > > *state backend: filesystem* > *checkpoint.mode:EXACTLY_ONCE* > +dag:+ > val streams = sEnv > .addSource(makeKafkaSource(config)) > .map(makeEvent) > .keyBy(_.get(EVENT_GROUP_ID)) > .window(EventTimeSessionWindows.withGap(Time.seconds(60))) > .trigger(PurgingTrigger.of(EventTimeTrigger.create())) > .apply(makeEventsList) > .addSink(makeNoOpSink) > * The job runs fine and checkpoints succeed for few hours. > * Later it fails because of the following checkpoint error. > * Once the job is recovered from the last successful checkpoint, it > continues to fail with the same checkpoint error. > * This persists until the job is restarted with no checkpoint state or using > the checkpoint previous to the last good one. > AsynchronousException\{java.lang.Exception: Could not materialize checkpoint > 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: > NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 42 for > operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, > makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed keyed > state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to >