Hi Vishal,

Thanks for checking and glad to hear that your job works after the fix!

As for the equals/hashcode question, if your question is if you have to 
implement exact equals() method and the corresponding hashcode()
then the answer is yes. These methods are used when retrieving and cleaning up 
“outdated” data from FlinkCEP’s internal datastructures.
As a consequence, ambiguous implementations can lead to the wrong elements 
being cleaned up.

Thanks, 
Kostas

> On Jan 21, 2018, at 3:32 PM, Vishal Santoshi <vishal.santo...@gmail.com> 
> wrote:
> 
> Have tested against the 1.5 SNAPShot ( I simply pulled  the source code into 
> my distribution and compiled it into my job jar ). Both the test code and the 
> cluster seems to work ok. Have not tested the "savepoint  and resume" mode 
> but restore from checkpoint works. I brought the JM down and restarted it.  I 
> have to sanitize the output but at least the exception is not thrown.
> 
> One thing though and please confirm
> 
> In CEP it seems that a POJO pushed into the window as part of Pattern match 
> has to have an  "exact" equals/hashcode.  As in in my case I had a custom 
> equals/hashcode for enabling "contains" for a different context as in I had 
> deliberately not included an instance variable in the equals/hashcode  
> contract. Is that a design decision or a requirement in CEP ? 
> 
> Thanks and Regards.
> 
> 
> 
> 
> On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi <vishal.santo...@gmail.com 
> <mailto:vishal.santo...@gmail.com>> wrote:
> Will do.
> 
> On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <fhue...@gmail.com 
> <mailto:fhue...@gmail.com>> wrote:
> We don't have a schedule for bugfix releases but do them based on need.
> AFAIK, a discussion about a 1.4.1 release has not been started yet.
> 
> Would you like to kick that off by sending a mail to the dev mailing list?
> 
> 
> 2018-01-12 16:41 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com 
> <mailto:vishal.santo...@gmail.com>>:
> Thanks.  We will.
> 
>    When is 1.4.1 scheduled for release ? 
> 
> On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <wysakowicz.da...@gmail.com 
> <mailto:wysakowicz.da...@gmail.com>> wrote:
> Hi Vishal,
> I think it might be due to this bug: 
> https://issues.apache.org/jira/browse/FLINK-8226 
> <https://issues.apache.org/jira/browse/FLINK-8226>
> It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? 
> Would be really helpful. If the error still persists could you file a jira?
> 
> Regards
> Dawid
> 
> > On 11 Jan 2018, at 19:49, Vishal Santoshi <vishal.santo...@gmail.com 
> > <mailto:vishal.santo...@gmail.com>> wrote:
> >
> > When checkpointing is turned on a simple CEP loop pattern
> >
> >  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = 
> > Pattern.<Tuple2<Integer, 
> > SimpleBinaryEvent>>begin("start").where(checkStatusOn)
> >         .followedBy("middle").where(checkStatusOn).times(2)
> >         .next("end").where(checkStatusOn).within(Time.minutes(5))
> >
> > I see failures.
> >
> > SimpleBinaryEvent is
> >
> > public class SimpleBinaryEvent implements Serializable {
> >
> > private int id;
> > private int sequence;
> > private boolean status;
> > private long time;
> >
> > public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
> >
> > this.id <http://this.id/>
> >  = id;
> >     this.sequence = sequence;
> >     this.status = status;
> >     this.time = time;
> > }
> > public int getId() {
> >     return id;
> > }
> > public int getSequence() {
> >     return sequence;
> > }
> > public boolean isStatus() {
> >     return status;
> > }
> > public long getTime() {
> >     return time;
> > }
> > @Override
> > public boolean equals(Object o) {
> >     if (this == o) return true;
> >     if (o == null || getClass() != o.getClass()) return false;
> >
> >     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
> >
> >     if (getId() != that.getId()) return false;
> >     if (isStatus() != that.isStatus()) return false;
> >     if (getSequence() != that.getSequence()) return false;
> >     return getTime() == that.getTime();
> > }
> >
> > @Override
> > public int hashCode() {
> >     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
> >     int result = getId();
> >     result = 31 * result + (isStatus() ? 1 : 0);
> >     result = 31 * result + getSequence();
> >     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
> >     return result;
> > }
> >
> > @Override
> > public String toString() {
> >     return "SimpleBinaryEvent{" +
> >             "id='" + id + '\'' +
> >             ", status=" + status +
> >             ", sequence=" + sequence +
> >             ", time=" + time +
> >             '}';
> > }
> >
> > }
> >
> > failure cause:
> >
> > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> > operator KeyedCEPPatternOperator -> Map (1/1).
> > ... 6 more
> > Caused by: java.util.concurrent.ExecutionException: 
> > java.lang.IllegalStateException: Could not find id for entry: 
> > SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', 
> > status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
> >
> > I am sure I have the equals() and hashCode() implemented the way it should 
> > be. I have tried the Objects.hashCode too. In other instances I have had 
> > CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), 
> > which again points to issues with references ( equality and what not ). 
> > Without checkpointing turned on it works as expected. I am running on a 
> > local cluster. Is CEP production ready ?
> >
> > I am using 1.3.2 Flink
> >
> 
> 
> 
> 
> 

Reply via email to