Thanks. Confirmed through tests the above behavior.

On Tue, Jan 23, 2018 at 4:09 AM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Hi Vishal,
>
> Thanks for checking and glad to hear that your job works after the fix!
>
> As for the equals/hashcode question, if your question is if you have to
> implement exact equals() method and the corresponding hashcode()
> then the answer is yes. These methods are used when retrieving and
> cleaning up “outdated” data from FlinkCEP’s internal datastructures.
> As a consequence, ambiguous implementations can lead to the wrong elements
> being cleaned up.
>
> Thanks,
> Kostas
>
> On Jan 21, 2018, at 3:32 PM, Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
> Have tested against the 1.5 SNAPShot ( I simply pulled  the source code
> into my distribution and compiled it into my job jar ). Both the test code
> and the cluster seems to work ok. Have not tested the "savepoint  and
> resume" mode but restore from checkpoint works. I brought the JM down and
> restarted it.  I have to sanitize the output but at least the exception is
> not thrown.
>
> One thing though and please confirm
>
> In CEP it seems that a POJO pushed into the window as part of Pattern
> match has to have an  "exact" equals/hashcode.  As in in my case I had a
> custom equals/hashcode for enabling "contains" for a different context as
> in I had deliberately not included an instance variable in the
> equals/hashcode  contract. Is that a design decision or a requirement in
> CEP ?
>
> Thanks and Regards.
>
>
>
>
> On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Will do.
>>
>> On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> We don't have a schedule for bugfix releases but do them based on need.
>>> AFAIK, a discussion about a 1.4.1 release has not been started yet.
>>>
>>> Would you like to kick that off by sending a mail to the dev mailing
>>> list?
>>>
>>>
>>> 2018-01-12 16:41 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>
>>>> Thanks.  We will.
>>>>
>>>>    When is 1.4.1 scheduled for release ?
>>>>
>>>> On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <
>>>> wysakowicz.da...@gmail.com> wrote:
>>>>
>>>>> Hi Vishal,
>>>>> I think it might be due to this bug: https://issues.apache.org/jira
>>>>> /browse/FLINK-8226
>>>>> It was merged for 1.4.1 and 1.5.0. Could you check with this changes
>>>>> applied? Would be really helpful. If the error still persists could you
>>>>> file a jira?
>>>>>
>>>>> Regards
>>>>> Dawid
>>>>>
>>>>> > On 11 Jan 2018, at 19:49, Vishal Santoshi <vishal.santo...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> > When checkpointing is turned on a simple CEP loop pattern
>>>>> >
>>>>> >  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern
>>>>> = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("star
>>>>> t").where(checkStatusOn)
>>>>> >         .followedBy("middle").where(checkStatusOn).times(2)
>>>>> >         .next("end").where(checkStatusOn).within(Time.minutes(5))
>>>>> >
>>>>> > I see failures.
>>>>> >
>>>>> > SimpleBinaryEvent is
>>>>> >
>>>>> > public class SimpleBinaryEvent implements Serializable {
>>>>> >
>>>>> > private int id;
>>>>> > private int sequence;
>>>>> > private boolean status;
>>>>> > private long time;
>>>>> >
>>>>> > public SimpleBinaryEvent(int id, int sequence, boolean status , long
>>>>> time) {
>>>>> >
>>>>> > this.id
>>>>> >  = id;
>>>>> >     this.sequence = sequence;
>>>>> >     this.status = status;
>>>>> >     this.time = time;
>>>>> > }
>>>>> > public int getId() {
>>>>> >     return id;
>>>>> > }
>>>>> > public int getSequence() {
>>>>> >     return sequence;
>>>>> > }
>>>>> > public boolean isStatus() {
>>>>> >     return status;
>>>>> > }
>>>>> > public long getTime() {
>>>>> >     return time;
>>>>> > }
>>>>> > @Override
>>>>> > public boolean equals(Object o) {
>>>>> >     if (this == o) return true;
>>>>> >     if (o == null || getClass() != o.getClass()) return false;
>>>>> >
>>>>> >     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
>>>>> >
>>>>> >     if (getId() != that.getId()) return false;
>>>>> >     if (isStatus() != that.isStatus()) return false;
>>>>> >     if (getSequence() != that.getSequence()) return false;
>>>>> >     return getTime() == that.getTime();
>>>>> > }
>>>>> >
>>>>> > @Override
>>>>> > public int hashCode() {
>>>>> >     //return Objects.hash(getId(),isStatus(),
>>>>> getSequence(),getTime());
>>>>> >     int result = getId();
>>>>> >     result = 31 * result + (isStatus() ? 1 : 0);
>>>>> >     result = 31 * result + getSequence();
>>>>> >     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>>>>> >     return result;
>>>>> > }
>>>>> >
>>>>> > @Override
>>>>> > public String toString() {
>>>>> >     return "SimpleBinaryEvent{" +
>>>>> >             "id='" + id + '\'' +
>>>>> >             ", status=" + status +
>>>>> >             ", sequence=" + sequence +
>>>>> >             ", time=" + time +
>>>>> >             '}';
>>>>> > }
>>>>> >
>>>>> > }
>>>>> >
>>>>> > failure cause:
>>>>> >
>>>>> > Caused by: java.lang.Exception: Could not materialize checkpoint 2
>>>>> for operator KeyedCEPPatternOperator -> Map (1/1).
>>>>> > ... 6 more
>>>>> > Caused by: java.util.concurrent.ExecutionException:
>>>>> java.lang.IllegalStateException: Could not find id for entry:
>>>>> SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1',
>>>>> status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
>>>>> >
>>>>> > I am sure I have the equals() and hashCode() implemented the way it
>>>>> should be. I have tried the Objects.hashCode too. In other instances I 
>>>>> have
>>>>> had CircularReference ( and thus stackOverflow ) on
>>>>> SharedBuffer.toString(), which again points to issues with references (
>>>>> equality and what not ). Without checkpointing turned on it works as
>>>>> expected. I am running on a local cluster. Is CEP production ready ?
>>>>> >
>>>>> > I am using 1.3.2 Flink
>>>>> >
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Reply via email to