[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException

2018-05-08 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467013#comment-16467013
 ] 

Stefan Richter commented on FLINK-9302:
---

Thanks for the update, I appreciate it. This information can also be helpful to 
others that run into similar problems.

> Checkpoints continues to fail when using filesystem state backend with 
> CIRCULAR REFERENCE:java.io.IOException
> -
>
> Key: FLINK-9302
> URL: https://issues.apache.org/jira/browse/FLINK-9302
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> *state backend: filesystem*
> *checkpoint.mode:EXACTLY_ONCE*
> +dag:+
> val streams = sEnv
> .addSource(makeKafkaSource(config))
> .map(makeEvent)
> .keyBy(_.get(EVENT_GROUP_ID))
> .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
> .trigger(PurgingTrigger.of(EventTimeTrigger.create()))
> .apply(makeEventsList)
> .addSink(makeNoOpSink)
>  * The job runs fine and checkpoints succeed for few hours. 
>  * Later it fails because of the following checkpoint error.
>  * Once the job is recovered from the last successful checkpoint, it 
> continues to fail with the same checkpoint error.
>  * This persists until the job is restarted with no checkpoint state or using 
> the checkpoint previous to the last good one.
> AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 
> 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: 
> NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).}
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 42 for 
> operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, 
> makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
> ... 5 more
> Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
> state future.
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
> ... 5 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
> ... 7 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:385)
> at 
> 

[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException

2018-05-07 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466281#comment-16466281
 ] 

Narayanan Arunachalam commented on FLINK-9302:
--

Thanks [~srichter]. I ran some tests over the weekend and found that though the 
error is from S3, it was actually because of the size of the checkpoint. In one 
of my tests, after the job recovered from a last good checkpoint, the state 
size continued to grow at ~10G every 15 mins.

This was because, too much data was read in to the pipeline with 
`maxOutOfOrderness` set to 60 secs. The windows won't fire soon enough 
resulting in large states. One option is to scale the cluster to deal with many 
open windows. But I realize `AscendingTimestampExtractor` might be enough for 
my use case and running some tests using this setting.

In any case, this particular error is a side effect of the way my windows are 
setup and no direct evidence of any bug. Thought I will post this comment 
anyway to keep you all in sync.

 

> Checkpoints continues to fail when using filesystem state backend with 
> CIRCULAR REFERENCE:java.io.IOException
> -
>
> Key: FLINK-9302
> URL: https://issues.apache.org/jira/browse/FLINK-9302
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> *state backend: filesystem*
> *checkpoint.mode:EXACTLY_ONCE*
> +dag:+
> val streams = sEnv
> .addSource(makeKafkaSource(config))
> .map(makeEvent)
> .keyBy(_.get(EVENT_GROUP_ID))
> .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
> .trigger(PurgingTrigger.of(EventTimeTrigger.create()))
> .apply(makeEventsList)
> .addSink(makeNoOpSink)
>  * The job runs fine and checkpoints succeed for few hours. 
>  * Later it fails because of the following checkpoint error.
>  * Once the job is recovered from the last successful checkpoint, it 
> continues to fail with the same checkpoint error.
>  * This persists until the job is restarted with no checkpoint state or using 
> the checkpoint previous to the last good one.
> AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 
> 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: 
> NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).}
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 42 for 
> operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, 
> makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
> ... 5 more
> Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
> state future.
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
> ... 5 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> 

[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException

2018-05-07 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16465575#comment-16465575
 ] 

Stefan Richter commented on FLINK-9302:
---

If you take a look at this stack trace, it tells you that the reason of the 
problem is related to the amazon sdk having troubles with the connection to S3 
and therefore it could not complete writing the checkpoint.

{{Caused by: com.amazonaws.SdkClientException: Unable to complete multi-part 
upload. Individual part upload failed : Unable to execute HTTP request: Broken 
pipe (Write failed)}}

This does not look like a Flink problem, maybe a hickup with S3. There are also 
proposed solutions to this exception if you google it.

> Checkpoints continues to fail when using filesystem state backend with 
> CIRCULAR REFERENCE:java.io.IOException
> -
>
> Key: FLINK-9302
> URL: https://issues.apache.org/jira/browse/FLINK-9302
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> *state backend: filesystem*
> *checkpoint.mode:EXACTLY_ONCE*
> +dag:+
> val streams = sEnv
> .addSource(makeKafkaSource(config))
> .map(makeEvent)
> .keyBy(_.get(EVENT_GROUP_ID))
> .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
> .trigger(PurgingTrigger.of(EventTimeTrigger.create()))
> .apply(makeEventsList)
> .addSink(makeNoOpSink)
>  * The job runs fine and checkpoints succeed for few hours. 
>  * Later it fails because of the following checkpoint error.
>  * Once the job is recovered from the last successful checkpoint, it 
> continues to fail with the same checkpoint error.
>  * This persists until the job is restarted with no checkpoint state or using 
> the checkpoint previous to the last good one.
> AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 
> 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: 
> NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).}
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 42 for 
> operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, 
> makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
> ... 5 more
> Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
> state future.
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
> ... 5 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
> ... 7 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
>