Implemented hashcode() on both the DEVICE_ID and the MOTION_DIRECTION ( the
pattern is built around this one ). Still giving me the following error:


java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Could not find previous
entry with key: first event, value:
{"DEVICE_ID":7435b060-d2fb-11e8-8da5-9779854d8172,"TIME_STAMP":11/16/2018
06:34:33.994 
pm,"TEMPERATURE":0.0,"HUMIDITY":"0.0","LIGHT_WHITE":0.0,"PROCX":0.0,"MOTION_DIRECTION":0,"MOTION_SPEED":0,"MOOD_STATE":0,"VISION_PEOPLE":0,"AUDIO1":0.0}
and timestamp: 1542393274928. This can indicate that either you did
not implement the equals() and hashCode() methods of your input
elements properly or that the element belonging to that entry has been
already pruned.
        at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
        at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
        at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
        at 
java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
        at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
        at 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
        ... 7 more


On Thu, Nov 15, 2018 at 8:13 AM Chesnay Schepler <ches...@apache.org> wrote:

> Does the issue persist if you implement hashCode() in IoTEvent like this:
>
> @Overridepublic int hashCode() {
>    return this.DEVICE_ID.hashCode();}
>
>
> On 15.11.2018 13:58, Steve Bistline wrote:
>
> Sure, here it is.
>
> Thank you so very much for having a look at this.
>
> Steve
>
>
> On Thu, Nov 15, 2018 at 4:18 AM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Can you provide us with the implementation of your Event and IoTEvent
>> classes?
>>
>> On 15.11.2018 06:10, Steve Bistline wrote:
>>
>> Any thoughts on where to start with this error would be appreciated.
>>
>> Caused by: java.lang.IllegalStateException: Could not find previous entry 
>> with key: first event, value: 
>> {"DEVICE_ID":f8a395a0-d3e2-11e8-b050-9779854d8172,"TIME_STAMP":11/15/2018 
>> 02:29:30.343 
>> am,"TEMPERATURE":0.0,"HUMIDITY":"0.0","LIGHT_WHITE":0.0,"PROCX":0.0,"MOTION_DIRECTION":0,"MOTION_SPEED":0,"MOOD_STATE":0,"VISION_PEOPLE":0,"AUDIO1":0.0}
>>  and timestamp: 1542248971585. This can indicate that either you did not 
>> implement the equals() and hashCode() methods of your input elements 
>> properly or that the element belonging to that entry has been already pruned.
>>
>> =====================================================
>>
>> CODE HERE
>>
>> =====================================================
>>
>> //kinesisConsumerConfig.list(System.out);       // Consume the data streams 
>> from AWS Kinesis stream       DataStream<Event> dataStream = 
>> env.addSource(new FlinkKinesisConsumer<>(
>>                pt.getRequired("stream"),               new EventSchema(),    
>>            kinesisConsumerConfig))
>>                .name("Kinesis Stream Consumer");       
>> System.out.printf("Print dataStream\n");    //dataStream.print();       
>> DataStream<Event> kinesisStream = dataStream
>>                .assignTimestampsAndWatermarks(new 
>> TimeLagWatermarkGenerator())
>>                .map(event -> (IoTEvent) event);       // Prints the mapped 
>> records from the Kinesis stream       //kinesisStream.print();    
>> //System.out.printf("Print kinesisStream\n");              Pattern<Event, ?> 
>> pattern = Pattern
>>                .<Event> begin("first event").subtype(IoTEvent.class)
>>                .where(new IterativeCondition<IoTEvent>()
>>        {
>>            private static final long serialVersionUID = 
>> -6301755149429716724L;           @Override
>>            public boolean filter(IoTEvent value, Context<IoTEvent> ctx) 
>> throws Exception {
>>                PatternConstants.MOTION_FIRST = value.getMotionDir();         
>>   return value.getMotionDir() != PatternConstants.MOTION_NA;       }
>>        })
>>                .next("second")
>>                .subtype(IoTEvent.class)
>>                .where(new IterativeCondition<IoTEvent>() {
>>                    private static final long serialVersionUID = 
>> 2392863109523984059L;                   @Override
>>                    public boolean filter(IoTEvent value, Context<IoTEvent> 
>> ctx) throws Exception {
>>
>>                        return value.getMotionDir() != 
>> PatternConstants.MOTION_NA && value.getMotionDir() != 
>> PatternConstants.MOTION_FIRST;                   }
>>                })
>>                .next("third")
>>                .subtype(IoTEvent.class)
>>                .where(new IterativeCondition<IoTEvent>() {
>>                    private static final long serialVersionUID = 
>> 2392863109523984059L;                   @Override
>>                    public boolean filter(IoTEvent value, Context<IoTEvent> 
>> ctx) throws Exception {
>>
>>                        return value.getMotionDir() != 
>> PatternConstants.MOTION_NA && value.getMotionDir() == 
>> PatternConstants.MOTION_FIRST;                   }
>>                })
>>                .next("fourth")
>>                .subtype(IoTEvent.class)
>>                .where(new IterativeCondition<IoTEvent>() {
>>                    private static final long serialVersionUID = 
>> 2392863109523984059L;                   @Override
>>                    public boolean filter(IoTEvent value, Context<IoTEvent> 
>> ctx) throws Exception {
>>
>>                        return value.getMotionDir() != 
>> PatternConstants.MOTION_NA && value.getMotionDir() != 
>> PatternConstants.MOTION_FIRST;                   }
>>                })
>>                .within(Time.seconds(10));
>>
>>
>>
>

Reply via email to