Re: Keyed CEP checkpoint fails
Hi Dawid, After rewriting dashcode with Objects.hash for all the fields, I still get the same error. One thing special is checkpoints always fail at 428, after trying many times. Does it mean anything? > On Aug 10, 2017, at 9:14 AM, Dawid Wysakowicz > wrote: > > Yes, with the information I have, the conclusion would be the same, that I > think the reason is problem with hashcode. Without some data to reproduce it > unfortunately I won’t be able to help you further. I could just advise you to > debug the method SharedBuffer#serialize and pay attention to the entryID map. > >> On 10 Aug 2017, at 14:54, Daiqing Li wrote: >> >> Oh sorry, the data in {} is not empty because I hide private information >> about my model. Do you have that same conclusion? >>> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz >>> wrote: >>> >>> You are right, I won’t be able to reproduce this problem without data. One >>> thing I can tell though that I think the problem is indeed with the >>> hashcode. Unforunately I don’t know Gson, but one strange thing I noticed >>> is the exception message: SharedBufferEntry(ValueTimeWrapper({}, >>> 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your >>> event.toString, which seems odd as if your event was empty. >>> >>> Generally speaking as I understand this Exception is thrown because the >>> hashcode of your event changes during serialization, and access to some >>> internal temporary cache is broken. >>> On 10 Aug 2017, at 14:29, Daiqing Li wrote: Hi, Here is the code. But I am not sure if you can reproduce the problem without data source. Best, Daiqing On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz wrote: As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info. Regards, Dawid > On 10 Aug 2017, at 14:10, Daiqing Li wrote: > > Hi Flink user, > > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this > exception after running for a while. Could anyone give me some help to > debug this? I try parallelism 1, and it has the same problem. I also try > reimplemented hashcode and equals method. I use UUID as hashcode right > now. > 2017-08-09 18:15:04,572 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) > (d4749a4c3469732a2a5edf40b83f88 > d4) switched from RUNNING to FAILED. > AsynchronousException{java. > lang.Exception: Could not materialize checkpoint 946 for operator > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} >at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > StreamTask.java:970) >at java.util.concurrent. > Executors$RunnableAdapter. > call(Executors.java:511) >at java.util.concurrent. > FutureTask.run(FutureTask. > java:266) >at java.util.concurrent. > ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) >at java.util.concurrent. > ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) >at java.lang.Thread.run(Thread. > java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for > operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). >... 6 more > Caused by: java.util.concurrent. > ExecutionException: java.lang.IllegalStateException: Could not find id > for entry: SharedBufferEntry( > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) >at java.util.concurrent. > FutureTask.report(FutureTask. > java:122) >at java.util.concurrent. > FutureTask.get(FutureTask. > java:192) >at org.apache.flink.util. > FutureUtil.runIfNotDoneAndGet( > FutureUtil.java:43) >at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > StreamTask.java:897) >... 5 more >Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. >at org.apache.flink.streaming. > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: > 90) >at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable. > cleanup(StreamTask.java:1023) >at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > StreamTask.java:961) >... 5 more > >>> >> >
Re: Keyed CEP checkpoint fails
Yes, with the information I have, the conclusion would be the same, that I think the reason is problem with hashcode. Without some data to reproduce it unfortunately I won’t be able to help you further. I could just advise you to debug the method SharedBuffer#serialize and pay attention to the entryID map. > On 10 Aug 2017, at 14:54, Daiqing Li wrote: > > Oh sorry, the data in {} is not empty because I hide private information > about my model. Do you have that same conclusion? >> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz >> wrote: >> >> You are right, I won’t be able to reproduce this problem without data. One >> thing I can tell though that I think the problem is indeed with the >> hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is >> the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, >> 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, >> which seems odd as if your event was empty. >> >> Generally speaking as I understand this Exception is thrown because the >> hashcode of your event changes during serialization, and access to some >> internal temporary cache is broken. >> >>> On 10 Aug 2017, at 14:29, Daiqing Li wrote: >>> >>> Hi, >>> >>> Here is the code. But I am not sure if you can reproduce the problem >>> without data source. >>> >>> Best, >>> Daiqing >>> >>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz >>> wrote: >>> As @Kostas asked in your previous thread would be possible for you to share >>> your code for that job or at least a minimal example to reproduce this >>> behaviour. I fear we won’t be able to help you without any further info. >>> >>> Regards, >>> Dawid >>> On 10 Aug 2017, at 14:10, Daiqing Li wrote: Hi Flink user, I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now. 2017-08-09 18:15:04,572 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88 d4) switched from RUNNING to FAILED. AsynchronousException{java. lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} at org.apache.flink.streaming. runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( StreamTask.java:970) at java.util.concurrent. Executors$RunnableAdapter. call(Executors.java:511) at java.util.concurrent. FutureTask.run(FutureTask. java:266) at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1149) at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread. java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). ... 6 more Caused by: java.util.concurrent. ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry( ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) at java.util.concurrent. FutureTask.report(FutureTask. java:122) at java.util.concurrent. FutureTask.get(FutureTask. java:192) at org.apache.flink.util. FutureUtil.runIfNotDoneAndGet( FutureUtil.java:43) at org.apache.flink.streaming. runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( StreamTask.java:897) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming. api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: 90) at org.apache.flink.streaming. runtime.tasks.StreamTask$AsyncCheckpointRunnable. cleanup(StreamTask.java:1023) at org.apache.flink.streaming. runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( StreamTask.java:961) ... 5 more >>> >>> >>> >> > signature.asc Description: Message signed with OpenPGP
Re: Keyed CEP checkpoint fails
Oh sorry, the data in {} is not empty because I hide private information about my model. Do you have that same conclusion? > On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz > wrote: > > You are right, I won’t be able to reproduce this problem without data. One > thing I can tell though that I think the problem is indeed with the hashcode. > Unforunately I don’t know Gson, but one strange thing I noticed is the > exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), > [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which > seems odd as if your event was empty. > > Generally speaking as I understand this Exception is thrown because the > hashcode of your event changes during serialization, and access to some > internal temporary cache is broken. > >> On 10 Aug 2017, at 14:29, Daiqing Li wrote: >> >> Hi, >> >> Here is the code. But I am not sure if you can reproduce the problem without >> data source. >> >> Best, >> Daiqing >> >> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz >> wrote: >> As @Kostas asked in your previous thread would be possible for you to share >> your code for that job or at least a minimal example to reproduce this >> behaviour. I fear we won’t be able to help you without any further info. >> >> Regards, >> Dawid >> >>> On 10 Aug 2017, at 14:10, Daiqing Li wrote: >>> >>> Hi Flink user, >>> >>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this >>> exception after running for a while. Could anyone give me some help to >>> debug this? I try parallelism 1, and it has the same problem. I also try >>> reimplemented hashcode and equals method. I use UUID as hashcode right now. >>> 2017-08-09 18:15:04,572 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph- >>> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) >>> (d4749a4c3469732a2a5edf40b83f88 >>> d4) switched from RUNNING to FAILED. >>> AsynchronousException{java. >>> lang.Exception: Could not materialize checkpoint 946 for operator >>> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} >>> at org.apache.flink.streaming. >>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>> StreamTask.java:970) >>> at java.util.concurrent. >>> Executors$RunnableAdapter. >>> call(Executors.java:511) >>> at java.util.concurrent. >>> FutureTask.run(FutureTask. >>> java:266) >>> at java.util.concurrent. >>> ThreadPoolExecutor.runWorker( >>> ThreadPoolExecutor.java:1149) >>> at java.util.concurrent. >>> ThreadPoolExecutor$Worker.run( >>> ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread. >>> java:748) >>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for >>> operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). >>> ... 6 more >>> Caused by: java.util.concurrent. >>> ExecutionException: java.lang.IllegalStateException: Could not find id for >>> entry: SharedBufferEntry( >>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) >>> at java.util.concurrent. >>> FutureTask.report(FutureTask. >>> java:122) >>> at java.util.concurrent. >>> FutureTask.get(FutureTask. >>> java:192) >>> at org.apache.flink.util. >>> FutureUtil.runIfNotDoneAndGet( >>> FutureUtil.java:43) >>> at org.apache.flink.streaming. >>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>> StreamTask.java:897) >>> ... 5 more >>> Suppressed: java.lang.Exception: Could not properly cancel managed >>> keyed state future. >>> at org.apache.flink.streaming. >>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: >>> 90) >>> at org.apache.flink.streaming. >>> runtime.tasks.StreamTask$AsyncCheckpointRunnable. >>> cleanup(StreamTask.java:1023) >>> at org.apache.flink.streaming. >>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>> StreamTask.java:961) >>> ... 5 more >>> >> >> >> >
Re: Keyed CEP checkpoint fails
You are right, I won’t be able to reproduce this problem without data. One thing I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which seems odd as if your event was empty. Generally speaking as I understand this Exception is thrown because the hashcode of your event changes during serialization, and access to some internal temporary cache is broken. > On 10 Aug 2017, at 14:29, Daiqing Li wrote: > > Hi, > > Here is the code. But I am not sure if you can reproduce the problem without > data source. > > Best, > Daiqing > > On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz > wrote: > As @Kostas asked in your previous thread would be possible for you to share > your code for that job or at least a minimal example to reproduce this > behaviour. I fear we won’t be able to help you without any further info. > > Regards, > Dawid > > > On 10 Aug 2017, at 14:10, Daiqing Li wrote: > > > > Hi Flink user, > > > > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this > > exception after running for a while. Could anyone give me some help to > > debug this? I try parallelism 1, and it has the same problem. I also try > > reimplemented hashcode and equals method. I use UUID as hashcode right now. > > 2017-08-09 18:15:04,572 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph- > > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) > > (d4749a4c3469732a2a5edf40b83f88 > > d4) switched from RUNNING to FAILED. > > AsynchronousException{java. > > lang.Exception: Could not materialize checkpoint 946 for operator > > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:970) > > at java.util.concurrent. > > Executors$RunnableAdapter. > > call(Executors.java:511) > > at java.util.concurrent. > > FutureTask.run(FutureTask. > > java:266) > > at java.util.concurrent. > > ThreadPoolExecutor.runWorker( > > ThreadPoolExecutor.java:1149) > > at java.util.concurrent. > > ThreadPoolExecutor$Worker.run( > > ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread. > > java:748) > > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for > > operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). > > ... 6 more > > Caused by: java.util.concurrent. > > ExecutionException: java.lang.IllegalStateException: Could not find id for > > entry: SharedBufferEntry( > > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) > > at java.util.concurrent. > > FutureTask.report(FutureTask. > > java:122) > > at java.util.concurrent. > > FutureTask.get(FutureTask. > > java:192) > > at org.apache.flink.util. > > FutureUtil.runIfNotDoneAndGet( > > FutureUtil.java:43) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:897) > > ... 5 more > > Suppressed: java.lang.Exception: Could not properly cancel managed > > keyed state future. > > at org.apache.flink.streaming. > > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: > > 90) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable. > > cleanup(StreamTask.java:1023) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:961) > > ... 5 more > > > > > signature.asc Description: Message signed with OpenPGP
Re: Keyed CEP checkpoint fails
Hi, Here is the code. But I am not sure if you can reproduce the problem without data source. Best, Daiqing On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz < wysakowicz.da...@gmail.com> wrote: > As @Kostas asked in your previous thread would be possible for you to > share your code for that job or at least a minimal example to reproduce > this behaviour. I fear we won’t be able to help you without any further > info. > > Regards, > Dawid > > > On 10 Aug 2017, at 14:10, Daiqing Li wrote: > > > > Hi Flink user, > > > > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this > exception after running for a while. Could anyone give me some help to > debug this? I try parallelism 1, and it has the same problem. I also try > reimplemented hashcode and equals method. I use UUID as hashcode right now. > > 2017-08-09 18:15:04,572 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph > - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) ( > d4749a4c3469732a2a5edf40b83f88 > > d4) switched from RUNNING to FAILED. > > AsynchronousException{java. > > lang.Exception: Could not materialize checkpoint 946 for operator > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:970) > > at java.util.concurrent. > > Executors$RunnableAdapter. > > call(Executors.java:511) > > at java.util.concurrent. > > FutureTask.run(FutureTask. > > java:266) > > at java.util.concurrent. > > ThreadPoolExecutor.runWorker( > > ThreadPoolExecutor.java:1149) > > at java.util.concurrent. > > ThreadPoolExecutor$Worker.run( > > ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread. > > java:748) > > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for > operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). > > ... 6 more > > Caused by: java.util.concurrent. > > ExecutionException: java.lang.IllegalStateException: Could not find id > for entry: SharedBufferEntry( > > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) > > at java.util.concurrent. > > FutureTask.report(FutureTask. > > java:122) > > at java.util.concurrent. > > FutureTask.get(FutureTask. > > java:192) > > at org.apache.flink.util. > > FutureUtil.runIfNotDoneAndGet( > > FutureUtil.java:43) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:897) > > ... 5 more > > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > > at org.apache.flink.streaming. > > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: > > 90) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable. > > cleanup(StreamTask.java:1023) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:961) > > ... 5 more > > > > MilestoneEvent.java Description: Binary data example.java Description: Binary data
Re: Keyed CEP checkpoint fails
As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info. Regards, Dawid > On 10 Aug 2017, at 14:10, Daiqing Li wrote: > > Hi Flink user, > > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this > exception after running for a while. Could anyone give me some help to debug > this? I try parallelism 1, and it has the same problem. I also try > reimplemented hashcode and equals method. I use UUID as hashcode right now. > 2017-08-09 18:15:04,572 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) > (d4749a4c3469732a2a5edf40b83f88 > d4) switched from RUNNING to FAILED. > AsynchronousException{java. > lang.Exception: Could not materialize checkpoint 946 for operator > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} > at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > StreamTask.java:970) > at java.util.concurrent. > Executors$RunnableAdapter. > call(Executors.java:511) > at java.util.concurrent. > FutureTask.run(FutureTask. > java:266) > at java.util.concurrent. > ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > at java.util.concurrent. > ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread. > java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for > operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). > ... 6 more > Caused by: java.util.concurrent. > ExecutionException: java.lang.IllegalStateException: Could not find id for > entry: SharedBufferEntry( > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) > at java.util.concurrent. > FutureTask.report(FutureTask. > java:122) > at java.util.concurrent. > FutureTask.get(FutureTask. > java:192) > at org.apache.flink.util. > FutureUtil.runIfNotDoneAndGet( > FutureUtil.java:43) > at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at org.apache.flink.streaming. > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: > 90) > at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable. > cleanup(StreamTask.java:1023) > at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > StreamTask.java:961) > ... 5 more > signature.asc Description: Message signed with OpenPGP
Keyed CEP checkpoint fails
Hi Flink user, I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now. 2017-08-09 18:15:04,572 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88d4) switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) ... 5 more