[ https://issues.apache.org/jira/browse/FLINK-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kostas Kloudas updated FLINK-6578: ---------------------------------- Priority: Blocker (was: Major) > SharedBuffer creates self-loops when having elements with same > value/timestamp. > ------------------------------------------------------------------------------- > > Key: FLINK-6578 > URL: https://issues.apache.org/jira/browse/FLINK-6578 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.3.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Blocker > Fix For: 1.3.0 > > > This is a test that fails with the current implementation due to the fact > that the looping state accepts the two {{middleEvent1}} elements but the > shared buffer cannot distinguish between them and gets trapped in an infinite > loop leading to running out of memory. > {code} > @Test > public void testEagerZeroOrMoreSameElement() { > List<StreamRecord<Event>> inputEvents = new ArrayList<>(); > Event startEvent = new Event(40, "c", 1.0); > Event middleEvent1 = new Event(41, "a", 2.0); > Event middleEvent2 = new Event(42, "a", 3.0); > Event middleEvent3 = new Event(43, "a", 4.0); > Event end1 = new Event(44, "b", 5.0); > inputEvents.add(new StreamRecord<>(startEvent, 1)); > inputEvents.add(new StreamRecord<>(middleEvent1, 3)); > inputEvents.add(new StreamRecord<>(middleEvent1, 3)); > inputEvents.add(new StreamRecord<>(middleEvent1, 3)); > inputEvents.add(new StreamRecord<>(middleEvent2, 4)); > inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5)); > inputEvents.add(new StreamRecord<>(middleEvent3, 6)); > inputEvents.add(new StreamRecord<>(middleEvent3, 6)); > inputEvents.add(new StreamRecord<>(end1, 7)); > Pattern<Event, ?> pattern = > Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { > private static final long serialVersionUID = > 5726188262756267490L; > @Override > public boolean filter(Event value) throws Exception { > return value.getName().equals("c"); > } > }).followedBy("middle").where(new SimpleCondition<Event>() { > private static final long serialVersionUID = > 5726188262756267490L; > @Override > public boolean filter(Event value) throws Exception { > return value.getName().equals("a"); > } > }).oneOrMore().optional().followedBy("end1").where(new > SimpleCondition<Event>() { > private static final long serialVersionUID = > 5726188262756267490L; > @Override > public boolean filter(Event value) throws Exception { > return value.getName().equals("b"); > } > }); > NFA<Event> nfa = NFACompiler.compile(pattern, > Event.createTypeSerializer(), false); > final List<List<Event>> resultingPatterns = > feedNFA(inputEvents, nfa); > compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList( > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1), > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1), > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, middleEvent1, middleEvent2, end1), > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, middleEvent1, end1), > Lists.newArrayList(startEvent, middleEvent1, > middleEvent1, end1), > Lists.newArrayList(startEvent, middleEvent1, > end1), > Lists.newArrayList(startEvent, end1) > )); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)