When checkpointing is turned on a simple CEP loop pattern

 private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern =
Pattern.<Tuple2<Integer,
SimpleBinaryEvent>>begin("start").where(checkStatusOn)
        .followedBy("middle").where(checkStatusOn).times(2)
        .next("end").where(checkStatusOn).within(Time.minutes(5))

I see failures.

SimpleBinaryEvent is

public class SimpleBinaryEvent implements Serializable {

private int id;
private int sequence;
private boolean status;
private long time;

public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
    this.id = id;
    this.sequence = sequence;
    this.status = status;
    this.time = time;
}
public int getId() {
    return id;
}
public int getSequence() {
    return sequence;
}
public boolean isStatus() {
    return status;
}
public long getTime() {
    return time;
}
@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    SimpleBinaryEvent that = (SimpleBinaryEvent) o;

    if (getId() != that.getId()) return false;
    if (isStatus() != that.isStatus()) return false;
    if (getSequence() != that.getSequence()) return false;
    return getTime() == that.getTime();
}

@Override
public int hashCode() {
    //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
    int result = getId();
    result = 31 * result + (isStatus() ? 1 : 0);
    result = 31 * result + getSequence();
    result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
    return result;
}

@Override
public String toString() {
    return "SimpleBinaryEvent{" +
            "id='" + id + '\'' +
            ", status=" + status +
            ", sequence=" + sequence +
            ", time=" + time +
            '}';
}

}

failure cause:

Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator KeyedCEPPatternOperator -> Map (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1',
status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....

I am sure I have the equals() and hashCode() implemented the way it should
be. I have tried the Objects.hashCode too. In other instances I have had
CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(),
which again points to issues with references ( equality and what not ).
Without checkpointing turned on it works as expected. I am running on a
local cluster. Is CEP production ready ?

I am using 1.3.2 Flink

Reply via email to