Hi Vishal,
I think it might be due to this bug: 
https://issues.apache.org/jira/browse/FLINK-8226
It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? 
Would be really helpful. If the error still persists could you file a jira?

Regards
Dawid

> On 11 Jan 2018, at 19:49, Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> 
> When checkpointing is turned on a simple CEP loop pattern
> 
>  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = 
> Pattern.<Tuple2<Integer, 
> SimpleBinaryEvent>>begin("start").where(checkStatusOn)
>         .followedBy("middle").where(checkStatusOn).times(2)
>         .next("end").where(checkStatusOn).within(Time.minutes(5))
> 
> I see failures.
> 
> SimpleBinaryEvent is
> 
> public class SimpleBinaryEvent implements Serializable {
> 
> private int id;
> private int sequence;
> private boolean status;
> private long time;
> 
> public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
> 
> this.id
>  = id;
>     this.sequence = sequence;
>     this.status = status;
>     this.time = time;
> }
> public int getId() {
>     return id;
> }
> public int getSequence() {
>     return sequence;
> }
> public boolean isStatus() {
>     return status;
> }
> public long getTime() {
>     return time;
> }
> @Override
> public boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
> 
>     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
> 
>     if (getId() != that.getId()) return false;
>     if (isStatus() != that.isStatus()) return false;
>     if (getSequence() != that.getSequence()) return false;
>     return getTime() == that.getTime();
> }
> 
> @Override
> public int hashCode() {
>     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
>     int result = getId();
>     result = 31 * result + (isStatus() ? 1 : 0);
>     result = 31 * result + getSequence();
>     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>     return result;
> }
> 
> @Override
> public String toString() {
>     return "SimpleBinaryEvent{" +
>             "id='" + id + '\'' +
>             ", status=" + status +
>             ", sequence=" + sequence +
>             ", time=" + time +
>             '}';
> }
> 
> }
> 
> failure cause:
> 
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator KeyedCEPPatternOperator -> Map (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException: Could not find id for entry: 
> SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, 
> sequence=95, time=1505503380000}), 1505503380000, 0),....
> 
> I am sure I have the equals() and hashCode() implemented the way it should 
> be. I have tried the Objects.hashCode too. In other instances I have had 
> CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), 
> which again points to issues with references ( equality and what not ). 
> Without checkpointing turned on it works as expected. I am running on a local 
> cluster. Is CEP production ready ?
> 
> I am using 1.3.2 Flink
> 

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to