Re: Keyed CEP checkpoint fails

2017-08-10 Thread Daiqing Li
Hi Dawid,

After rewriting dashcode with Objects.hash for all the fields, I still get the 
same error. One thing special is checkpoints always fail at 428, after trying 
many times. Does it mean anything?
> On Aug 10, 2017, at 9:14 AM, Dawid Wysakowicz  
> wrote:
> 
> Yes, with the information I have, the conclusion would be the same, that I 
> think the reason is problem with hashcode. Without some data to reproduce it 
> unfortunately I won’t be able to help you further. I could just advise you to 
> debug the method SharedBuffer#serialize and pay attention to the entryID map.
> 
>> On 10 Aug 2017, at 14:54, Daiqing Li  wrote:
>> 
>> Oh sorry, the data in {} is not empty because I hide private information 
>> about my model. Do you have that same conclusion?
>>> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz  
>>> wrote:
>>> 
>>> You are right, I won’t be able to reproduce this problem without data. One 
>>> thing I can tell though that I think the problem is indeed with the 
>>> hashcode. Unforunately I don’t know Gson, but one strange thing I noticed 
>>> is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 
>>> 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your 
>>> event.toString, which seems odd as if your event was empty.
>>> 
>>> Generally speaking as I understand this Exception is thrown because the 
>>> hashcode of your event changes during serialization, and access to some 
>>> internal temporary cache is broken.
>>> 
 On 10 Aug 2017, at 14:29, Daiqing Li  wrote:
 
 Hi,
 
 Here is the code. But I am not sure if you can reproduce the problem 
 without data source.
 
 Best,
 Daiqing
 
 On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz 
  wrote:
 As @Kostas asked in your previous thread would be possible for you to 
 share your code for that job or at least a minimal example to reproduce 
 this behaviour. I fear we won’t be able to help you without any further 
 info.
 
 Regards,
 Dawid
 
> On 10 Aug 2017, at 14:10, Daiqing Li  wrote:
> 
> Hi Flink user,
> 
> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this 
> exception after running for a while. Could anyone give me some help to 
> debug this? I try parallelism 1, and it has the same problem. I also try 
> reimplemented hashcode and equals method. I use UUID as hashcode right 
> now.
> 2017-08-09 18:15:04,572 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- 
> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) 
> (d4749a4c3469732a2a5edf40b83f88
> d4) switched from RUNNING to FAILED.
> AsynchronousException{java.
> lang.Exception: Could not materialize checkpoint 946 for operator 
> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
>at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:970)
>at java.util.concurrent.
> Executors$RunnableAdapter.
> call(Executors.java:511)
>at java.util.concurrent.
> FutureTask.run(FutureTask.
> java:266)
>at java.util.concurrent.
> ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>at java.util.concurrent.
> ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.
> java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for 
> operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>... 6 more
> Caused by: java.util.concurrent.
> ExecutionException: java.lang.IllegalStateException: Could not find id 
> for entry: SharedBufferEntry(
> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>at java.util.concurrent.
> FutureTask.report(FutureTask.
> java:122)
>at java.util.concurrent.
> FutureTask.get(FutureTask.
> java:192)
>at org.apache.flink.util.
> FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:43)
>at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:897)
>... 5 more
>Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>at org.apache.flink.streaming.
> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
> 90)
>at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
> cleanup(StreamTask.java:1023)
>at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:961)
>... 5 more
> 
 
 
 
>>> 

Re: Keyed CEP checkpoint fails

2017-08-10 Thread Dawid Wysakowicz
Yes, with the information I have, the conclusion would be the same, that I 
think the reason is problem with hashcode. Without some data to reproduce it 
unfortunately I won’t be able to help you further. I could just advise you to 
debug the method SharedBuffer#serialize and pay attention to the entryID map.

> On 10 Aug 2017, at 14:54, Daiqing Li  wrote:
> 
> Oh sorry, the data in {} is not empty because I hide private information 
> about my model. Do you have that same conclusion?
>> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz  
>> wrote:
>> 
>> You are right, I won’t be able to reproduce this problem without data. One 
>> thing I can tell though that I think the problem is indeed with the 
>> hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is 
>> the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 
>> 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, 
>> which seems odd as if your event was empty.
>> 
>> Generally speaking as I understand this Exception is thrown because the 
>> hashcode of your event changes during serialization, and access to some 
>> internal temporary cache is broken.
>> 
>>> On 10 Aug 2017, at 14:29, Daiqing Li  wrote:
>>> 
>>> Hi,
>>> 
>>> Here is the code. But I am not sure if you can reproduce the problem 
>>> without data source.
>>> 
>>> Best,
>>> Daiqing
>>> 
>>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz 
>>>  wrote:
>>> As @Kostas asked in your previous thread would be possible for you to share 
>>> your code for that job or at least a minimal example to reproduce this 
>>> behaviour. I fear we won’t be able to help you without any further info.
>>> 
>>> Regards,
>>> Dawid
>>> 
 On 10 Aug 2017, at 14:10, Daiqing Li  wrote:
 
 Hi Flink user,
 
 I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this 
 exception after running for a while. Could anyone give me some help to 
 debug this? I try parallelism 1, and it has the same problem. I also try 
 reimplemented hashcode and equals method. I use UUID as hashcode right now.
 2017-08-09 18:15:04,572 INFO  
 org.apache.flink.runtime.executiongraph.ExecutionGraph- 
 KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) 
 (d4749a4c3469732a2a5edf40b83f88
 d4) switched from RUNNING to FAILED.
 AsynchronousException{java.
 lang.Exception: Could not materialize checkpoint 946 for operator 
 KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
 at org.apache.flink.streaming.
 runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
 StreamTask.java:970)
 at java.util.concurrent.
 Executors$RunnableAdapter.
 call(Executors.java:511)
 at java.util.concurrent.
 FutureTask.run(FutureTask.
 java:266)
 at java.util.concurrent.
 ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1149)
 at java.util.concurrent.
 ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.
 java:748)
 Caused by: java.lang.Exception: Could not materialize checkpoint 946 for 
 operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
 ... 6 more
 Caused by: java.util.concurrent.
 ExecutionException: java.lang.IllegalStateException: Could not find id for 
 entry: SharedBufferEntry(
 ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
 at java.util.concurrent.
 FutureTask.report(FutureTask.
 java:122)
 at java.util.concurrent.
 FutureTask.get(FutureTask.
 java:192)
 at org.apache.flink.util.
 FutureUtil.runIfNotDoneAndGet(
 FutureUtil.java:43)
 at org.apache.flink.streaming.
 runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
 StreamTask.java:897)
 ... 5 more
 Suppressed: java.lang.Exception: Could not properly cancel managed 
 keyed state future.
 at org.apache.flink.streaming.
 api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
 90)
 at org.apache.flink.streaming.
 runtime.tasks.StreamTask$AsyncCheckpointRunnable.
 cleanup(StreamTask.java:1023)
 at org.apache.flink.streaming.
 runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
 StreamTask.java:961)
 ... 5 more
 
>>> 
>>> 
>>> 
>> 
> 



signature.asc
Description: Message signed with OpenPGP


Re: Keyed CEP checkpoint fails

2017-08-10 Thread Daiqing Li
Oh sorry, the data in {} is not empty because I hide private information about 
my model. Do you have that same conclusion?
> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz  
> wrote:
> 
> You are right, I won’t be able to reproduce this problem without data. One 
> thing I can tell though that I think the problem is indeed with the hashcode. 
> Unforunately I don’t know Gson, but one strange thing I noticed is the 
> exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), 
> [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which 
> seems odd as if your event was empty.
> 
> Generally speaking as I understand this Exception is thrown because the 
> hashcode of your event changes during serialization, and access to some 
> internal temporary cache is broken.
> 
>> On 10 Aug 2017, at 14:29, Daiqing Li  wrote:
>> 
>> Hi,
>> 
>> Here is the code. But I am not sure if you can reproduce the problem without 
>> data source.
>> 
>> Best,
>> Daiqing
>> 
>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz 
>>  wrote:
>> As @Kostas asked in your previous thread would be possible for you to share 
>> your code for that job or at least a minimal example to reproduce this 
>> behaviour. I fear we won’t be able to help you without any further info.
>> 
>> Regards,
>> Dawid
>> 
>>> On 10 Aug 2017, at 14:10, Daiqing Li  wrote:
>>> 
>>> Hi Flink user,
>>> 
>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this 
>>> exception after running for a while. Could anyone give me some help to 
>>> debug this? I try parallelism 1, and it has the same problem. I also try 
>>> reimplemented hashcode and equals method. I use UUID as hashcode right now.
>>> 2017-08-09 18:15:04,572 INFO  
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph- 
>>> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) 
>>> (d4749a4c3469732a2a5edf40b83f88
>>> d4) switched from RUNNING to FAILED.
>>> AsynchronousException{java.
>>> lang.Exception: Could not materialize checkpoint 946 for operator 
>>> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
>>>  at org.apache.flink.streaming.
>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>> StreamTask.java:970)
>>>  at java.util.concurrent.
>>> Executors$RunnableAdapter.
>>> call(Executors.java:511)
>>>  at java.util.concurrent.
>>> FutureTask.run(FutureTask.
>>> java:266)
>>>  at java.util.concurrent.
>>> ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1149)
>>>  at java.util.concurrent.
>>> ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624)
>>>  at java.lang.Thread.run(Thread.
>>> java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for 
>>> operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>>>  ... 6 more
>>> Caused by: java.util.concurrent.
>>> ExecutionException: java.lang.IllegalStateException: Could not find id for 
>>> entry: SharedBufferEntry(
>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>>>  at java.util.concurrent.
>>> FutureTask.report(FutureTask.
>>> java:122)
>>>  at java.util.concurrent.
>>> FutureTask.get(FutureTask.
>>> java:192)
>>>  at org.apache.flink.util.
>>> FutureUtil.runIfNotDoneAndGet(
>>> FutureUtil.java:43)
>>>  at org.apache.flink.streaming.
>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>> StreamTask.java:897)
>>>  ... 5 more
>>>  Suppressed: java.lang.Exception: Could not properly cancel managed 
>>> keyed state future.
>>>  at org.apache.flink.streaming.
>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
>>> 90)
>>>  at org.apache.flink.streaming.
>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
>>> cleanup(StreamTask.java:1023)
>>>  at org.apache.flink.streaming.
>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>> StreamTask.java:961)
>>>  ... 5 more
>>> 
>> 
>> 
>> 
> 



Re: Keyed CEP checkpoint fails

2017-08-10 Thread Dawid Wysakowicz
You are right, I won’t be able to reproduce this problem without data. One 
thing I can tell though that I think the problem is indeed with the hashcode. 
Unforunately I don’t know Gson, but one strange thing I noticed is the 
exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), 
[SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which 
seems odd as if your event was empty.

Generally speaking as I understand this Exception is thrown because the 
hashcode of your event changes during serialization, and access to some 
internal temporary cache is broken.

> On 10 Aug 2017, at 14:29, Daiqing Li  wrote:
> 
> Hi,
> 
> Here is the code. But I am not sure if you can reproduce the problem without 
> data source.
> 
> Best,
> Daiqing
> 
> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz 
>  wrote:
> As @Kostas asked in your previous thread would be possible for you to share 
> your code for that job or at least a minimal example to reproduce this 
> behaviour. I fear we won’t be able to help you without any further info.
> 
> Regards,
> Dawid
> 
> > On 10 Aug 2017, at 14:10, Daiqing Li  wrote:
> >
> > Hi Flink user,
> >
> > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this 
> > exception after running for a while. Could anyone give me some help to 
> > debug this? I try parallelism 1, and it has the same problem. I also try 
> > reimplemented hashcode and equals method. I use UUID as hashcode right now.
> > 2017-08-09 18:15:04,572 INFO  
> > org.apache.flink.runtime.executiongraph.ExecutionGraph- 
> > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) 
> > (d4749a4c3469732a2a5edf40b83f88
> > d4) switched from RUNNING to FAILED.
> > AsynchronousException{java.
> > lang.Exception: Could not materialize checkpoint 946 for operator 
> > KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
> >   at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:970)
> >   at java.util.concurrent.
> > Executors$RunnableAdapter.
> > call(Executors.java:511)
> >   at java.util.concurrent.
> > FutureTask.run(FutureTask.
> > java:266)
> >   at java.util.concurrent.
> > ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1149)
> >   at java.util.concurrent.
> > ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:624)
> >   at java.lang.Thread.run(Thread.
> > java:748)
> > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for 
> > operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
> >   ... 6 more
> > Caused by: java.util.concurrent.
> > ExecutionException: java.lang.IllegalStateException: Could not find id for 
> > entry: SharedBufferEntry(
> > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
> >   at java.util.concurrent.
> > FutureTask.report(FutureTask.
> > java:122)
> >   at java.util.concurrent.
> > FutureTask.get(FutureTask.
> > java:192)
> >   at org.apache.flink.util.
> > FutureUtil.runIfNotDoneAndGet(
> > FutureUtil.java:43)
> >   at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:897)
> >   ... 5 more
> >   Suppressed: java.lang.Exception: Could not properly cancel managed 
> > keyed state future.
> >   at org.apache.flink.streaming.
> > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
> > 90)
> >   at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.
> > cleanup(StreamTask.java:1023)
> >   at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:961)
> >   ... 5 more
> >
> 
> 
> 



signature.asc
Description: Message signed with OpenPGP


Re: Keyed CEP checkpoint fails

2017-08-10 Thread Daiqing Li
Hi,

Here is the code. But I am not sure if you can reproduce the problem
without data source.

Best,
Daiqing

On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <
wysakowicz.da...@gmail.com> wrote:

> As @Kostas asked in your previous thread would be possible for you to
> share your code for that job or at least a minimal example to reproduce
> this behaviour. I fear we won’t be able to help you without any further
> info.
>
> Regards,
> Dawid
>
> > On 10 Aug 2017, at 14:10, Daiqing Li  wrote:
> >
> > Hi Flink user,
> >
> > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this
> exception after running for a while. Could anyone give me some help to
> debug this? I try parallelism 1, and it has the same problem. I also try
> reimplemented hashcode and equals method. I use UUID as hashcode right now.
> > 2017-08-09 18:15:04,572 INFO  
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
>   - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (
> d4749a4c3469732a2a5edf40b83f88
> > d4) switched from RUNNING to FAILED.
> > AsynchronousException{java.
> > lang.Exception: Could not materialize checkpoint 946 for operator
> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
> >   at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:970)
> >   at java.util.concurrent.
> > Executors$RunnableAdapter.
> > call(Executors.java:511)
> >   at java.util.concurrent.
> > FutureTask.run(FutureTask.
> > java:266)
> >   at java.util.concurrent.
> > ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1149)
> >   at java.util.concurrent.
> > ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:624)
> >   at java.lang.Thread.run(Thread.
> > java:748)
> > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for
> operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
> >   ... 6 more
> > Caused by: java.util.concurrent.
> > ExecutionException: java.lang.IllegalStateException: Could not find id
> for entry: SharedBufferEntry(
> > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
> >   at java.util.concurrent.
> > FutureTask.report(FutureTask.
> > java:122)
> >   at java.util.concurrent.
> > FutureTask.get(FutureTask.
> > java:192)
> >   at org.apache.flink.util.
> > FutureUtil.runIfNotDoneAndGet(
> > FutureUtil.java:43)
> >   at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:897)
> >   ... 5 more
> >   Suppressed: java.lang.Exception: Could not properly cancel managed
> keyed state future.
> >   at org.apache.flink.streaming.
> > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
> > 90)
> >   at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.
> > cleanup(StreamTask.java:1023)
> >   at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:961)
> >   ... 5 more
> >
>
>


MilestoneEvent.java
Description: Binary data


example.java
Description: Binary data


Re: Keyed CEP checkpoint fails

2017-08-10 Thread Dawid Wysakowicz
As @Kostas asked in your previous thread would be possible for you to share 
your code for that job or at least a minimal example to reproduce this 
behaviour. I fear we won’t be able to help you without any further info.

Regards,
Dawid

> On 10 Aug 2017, at 14:10, Daiqing Li  wrote:
> 
> Hi Flink user,
> 
> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this 
> exception after running for a while. Could anyone give me some help to debug 
> this? I try parallelism 1, and it has the same problem. I also try 
> reimplemented hashcode and equals method. I use UUID as hashcode right now.
> 2017-08-09 18:15:04,572 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- 
> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) 
> (d4749a4c3469732a2a5edf40b83f88
> d4) switched from RUNNING to FAILED.
> AsynchronousException{java.
> lang.Exception: Could not materialize checkpoint 946 for operator 
> KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
>   at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:970)
>   at java.util.concurrent.
> Executors$RunnableAdapter.
> call(Executors.java:511)
>   at java.util.concurrent.
> FutureTask.run(FutureTask.
> java:266)
>   at java.util.concurrent.
> ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>   at java.util.concurrent.
> ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.
> java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for 
> operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>   ... 6 more
> Caused by: java.util.concurrent.
> ExecutionException: java.lang.IllegalStateException: Could not find id for 
> entry: SharedBufferEntry(
> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>   at java.util.concurrent.
> FutureTask.report(FutureTask.
> java:122)
>   at java.util.concurrent.
> FutureTask.get(FutureTask.
> java:192)
>   at org.apache.flink.util.
> FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:43)
>   at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at org.apache.flink.streaming.
> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
> 90)
>   at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
> cleanup(StreamTask.java:1023)
>   at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:961)
>   ... 5 more
> 



signature.asc
Description: Message signed with OpenPGP


Keyed CEP checkpoint fails

2017-08-10 Thread Daiqing Li
Hi Flink user,

I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this
exception after running for a while. Could anyone give me some help to
debug this? I try parallelism 1, and it has the same problem. I also try
reimplemented hashcode and equals method. I use UUID as hashcode right now.

2017-08-09 18:15:04,572 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4)
(d4749a4c3469732a2a5edf40b83f88d4) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize
checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink:
Unnamed (3/4).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 946
for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0),
[SharedBufferEdge(null, 1)], 1)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
... 5 more