[ 
https://issues.apache.org/jira/browse/FLINK-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-6578:
----------------------------------
    Priority: Blocker  (was: Major)

> SharedBuffer creates self-loops when having elements with same 
> value/timestamp.
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-6578
>                 URL: https://issues.apache.org/jira/browse/FLINK-6578
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> This is a test that fails with the current implementation due to the fact 
> that the looping state accepts the two {{middleEvent1}} elements but the 
> shared buffer cannot distinguish between them and gets trapped in an infinite 
> loop leading to running out of memory.
> {code}
> @Test
>       public void testEagerZeroOrMoreSameElement() {
>               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
>               Event startEvent = new Event(40, "c", 1.0);
>               Event middleEvent1 = new Event(41, "a", 2.0);
>               Event middleEvent2 = new Event(42, "a", 3.0);
>               Event middleEvent3 = new Event(43, "a", 4.0);
>               Event end1 = new Event(44, "b", 5.0);
>               inputEvents.add(new StreamRecord<>(startEvent, 1));
>               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
>               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
>               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
>               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
>               inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
>               inputEvents.add(new StreamRecord<>(middleEvent3, 6));
>               inputEvents.add(new StreamRecord<>(middleEvent3, 6));
>               inputEvents.add(new StreamRecord<>(end1, 7));
>               Pattern<Event, ?> pattern = 
> Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
>                       private static final long serialVersionUID = 
> 5726188262756267490L;
>                       @Override
>                       public boolean filter(Event value) throws Exception {
>                               return value.getName().equals("c");
>                       }
>               }).followedBy("middle").where(new SimpleCondition<Event>() {
>                       private static final long serialVersionUID = 
> 5726188262756267490L;
>                       @Override
>                       public boolean filter(Event value) throws Exception {
>                               return value.getName().equals("a");
>                       }
>               }).oneOrMore().optional().followedBy("end1").where(new 
> SimpleCondition<Event>() {
>                       private static final long serialVersionUID = 
> 5726188262756267490L;
>                       @Override
>                       public boolean filter(Event value) throws Exception {
>                               return value.getName().equals("b");
>                       }
>               });
>               NFA<Event> nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>               final List<List<Event>> resultingPatterns = 
> feedNFA(inputEvents, nfa);
>               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
>                               Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
>                               Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
>                               Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, middleEvent1, middleEvent2, end1),
>                               Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, middleEvent1, end1),
>                               Lists.newArrayList(startEvent, middleEvent1, 
> middleEvent1, end1),
>                               Lists.newArrayList(startEvent, middleEvent1, 
> end1),
>                               Lists.newArrayList(startEvent, end1)
>               ));
>       }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to