Thanks. Confirmed through tests the above behavior. On Tue, Jan 23, 2018 at 4:09 AM, Kostas Kloudas <k.klou...@data-artisans.com > wrote:
> Hi Vishal, > > Thanks for checking and glad to hear that your job works after the fix! > > As for the equals/hashcode question, if your question is if you have to > implement exact equals() method and the corresponding hashcode() > then the answer is yes. These methods are used when retrieving and > cleaning up “outdated” data from FlinkCEP’s internal datastructures. > As a consequence, ambiguous implementations can lead to the wrong elements > being cleaned up. > > Thanks, > Kostas > > On Jan 21, 2018, at 3:32 PM, Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > > Have tested against the 1.5 SNAPShot ( I simply pulled the source code > into my distribution and compiled it into my job jar ). Both the test code > and the cluster seems to work ok. Have not tested the "savepoint and > resume" mode but restore from checkpoint works. I brought the JM down and > restarted it. I have to sanitize the output but at least the exception is > not thrown. > > One thing though and please confirm > > In CEP it seems that a POJO pushed into the window as part of Pattern > match has to have an "exact" equals/hashcode. As in in my case I had a > custom equals/hashcode for enabling "contains" for a different context as > in I had deliberately not included an instance variable in the > equals/hashcode contract. Is that a design decision or a requirement in > CEP ? > > Thanks and Regards. > > > > > On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> Will do. >> >> On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <fhue...@gmail.com> >> wrote: >> >>> We don't have a schedule for bugfix releases but do them based on need. >>> AFAIK, a discussion about a 1.4.1 release has not been started yet. >>> >>> Would you like to kick that off by sending a mail to the dev mailing >>> list? >>> >>> >>> 2018-01-12 16:41 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>> >>>> Thanks. We will. >>>> >>>> When is 1.4.1 scheduled for release ? >>>> >>>> On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz < >>>> wysakowicz.da...@gmail.com> wrote: >>>> >>>>> Hi Vishal, >>>>> I think it might be due to this bug: https://issues.apache.org/jira >>>>> /browse/FLINK-8226 >>>>> It was merged for 1.4.1 and 1.5.0. Could you check with this changes >>>>> applied? Would be really helpful. If the error still persists could you >>>>> file a jira? >>>>> >>>>> Regards >>>>> Dawid >>>>> >>>>> > On 11 Jan 2018, at 19:49, Vishal Santoshi <vishal.santo...@gmail.com> >>>>> wrote: >>>>> > >>>>> > When checkpointing is turned on a simple CEP loop pattern >>>>> > >>>>> > private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern >>>>> = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("star >>>>> t").where(checkStatusOn) >>>>> > .followedBy("middle").where(checkStatusOn).times(2) >>>>> > .next("end").where(checkStatusOn).within(Time.minutes(5)) >>>>> > >>>>> > I see failures. >>>>> > >>>>> > SimpleBinaryEvent is >>>>> > >>>>> > public class SimpleBinaryEvent implements Serializable { >>>>> > >>>>> > private int id; >>>>> > private int sequence; >>>>> > private boolean status; >>>>> > private long time; >>>>> > >>>>> > public SimpleBinaryEvent(int id, int sequence, boolean status , long >>>>> time) { >>>>> > >>>>> > this.id >>>>> > = id; >>>>> > this.sequence = sequence; >>>>> > this.status = status; >>>>> > this.time = time; >>>>> > } >>>>> > public int getId() { >>>>> > return id; >>>>> > } >>>>> > public int getSequence() { >>>>> > return sequence; >>>>> > } >>>>> > public boolean isStatus() { >>>>> > return status; >>>>> > } >>>>> > public long getTime() { >>>>> > return time; >>>>> > } >>>>> > @Override >>>>> > public boolean equals(Object o) { >>>>> > if (this == o) return true; >>>>> > if (o == null || getClass() != o.getClass()) return false; >>>>> > >>>>> > SimpleBinaryEvent that = (SimpleBinaryEvent) o; >>>>> > >>>>> > if (getId() != that.getId()) return false; >>>>> > if (isStatus() != that.isStatus()) return false; >>>>> > if (getSequence() != that.getSequence()) return false; >>>>> > return getTime() == that.getTime(); >>>>> > } >>>>> > >>>>> > @Override >>>>> > public int hashCode() { >>>>> > //return Objects.hash(getId(),isStatus(), >>>>> getSequence(),getTime()); >>>>> > int result = getId(); >>>>> > result = 31 * result + (isStatus() ? 1 : 0); >>>>> > result = 31 * result + getSequence(); >>>>> > result = 31 * result + (int) (getTime() ^ (getTime() >>> 32)); >>>>> > return result; >>>>> > } >>>>> > >>>>> > @Override >>>>> > public String toString() { >>>>> > return "SimpleBinaryEvent{" + >>>>> > "id='" + id + '\'' + >>>>> > ", status=" + status + >>>>> > ", sequence=" + sequence + >>>>> > ", time=" + time + >>>>> > '}'; >>>>> > } >>>>> > >>>>> > } >>>>> > >>>>> > failure cause: >>>>> > >>>>> > Caused by: java.lang.Exception: Could not materialize checkpoint 2 >>>>> for operator KeyedCEPPatternOperator -> Map (1/1). >>>>> > ... 6 more >>>>> > Caused by: java.util.concurrent.ExecutionException: >>>>> java.lang.IllegalStateException: Could not find id for entry: >>>>> SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', >>>>> status=true, sequence=95, time=1505503380000}), 1505503380000, 0),.... >>>>> > >>>>> > I am sure I have the equals() and hashCode() implemented the way it >>>>> should be. I have tried the Objects.hashCode too. In other instances I >>>>> have >>>>> had CircularReference ( and thus stackOverflow ) on >>>>> SharedBuffer.toString(), which again points to issues with references ( >>>>> equality and what not ). Without checkpointing turned on it works as >>>>> expected. I am running on a local cluster. Is CEP production ready ? >>>>> > >>>>> > I am using 1.3.2 Flink >>>>> > >>>>> >>>>> >>>> >>> >> > >