Repository: flink
Updated Branches:
  refs/heads/master b13914799 -> 449c84b0e


[FLINK-7170] [cep] Fix until condition when the contiguity is strict

This closes #4318.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/449c84b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/449c84b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/449c84b0

Branch: refs/heads/master
Commit: 449c84b0e21c6cfe43ab9ed697a69fa09308b381
Parents: b139147
Author: Dian Fu <fudian...@alibaba-inc.com>
Authored: Thu Jul 13 17:21:30 2017 +0800
Committer: Dawid Wysakowicz <dwysakow...@apache.org>
Committed: Tue Jul 25 14:30:59 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/nfa/compiler/NFACompiler.java     | 30 ++++++++-----
 .../flink/cep/nfa/UntilConditionITCase.java     | 47 ++++++++++++++++++++
 2 files changed, 67 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/449c84b0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index e160e4a..62464d1 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -630,10 +630,12 @@ public class NFACompiler {
 
                        final IterativeCondition<T> ignoreCondition = 
extendWithUntilCondition(
                                getInnerIgnoreCondition(currentPattern),
-                               untilCondition);
+                               untilCondition,
+                               false);
                        final IterativeCondition<T> takeCondition = 
extendWithUntilCondition(
                                getTakeCondition(currentPattern),
-                               untilCondition);
+                               untilCondition,
+                               true);
 
                        final IterativeCondition<T> proceedCondition = 
getTrueFunction();
                        final State<T> loopingState = 
createState(currentPattern.getName(), State.StateType.Normal);
@@ -664,7 +666,8 @@ public class NFACompiler {
                private State<T> createInitMandatoryStateOfOneOrMore(final 
State<T> sinkState) {
                        final IterativeCondition<T> takeCondition = 
extendWithUntilCondition(
                                getTakeCondition(currentPattern),
-                               (IterativeCondition<T>) 
currentPattern.getUntilCondition()
+                               (IterativeCondition<T>) 
currentPattern.getUntilCondition(),
+                               true
                        );
 
                        final IterativeCondition<T> ignoreCondition = 
getIgnoreCondition(currentPattern);
@@ -683,7 +686,8 @@ public class NFACompiler {
                private State<T> createInitOptionalStateOfZeroOrMore(final 
State<T> loopingState, final State<T> lastSink) {
                        final IterativeCondition<T> takeCondition = 
extendWithUntilCondition(
                                getTakeCondition(currentPattern),
-                               (IterativeCondition<T>) 
currentPattern.getUntilCondition()
+                               (IterativeCondition<T>) 
currentPattern.getUntilCondition(),
+                               true
                        );
 
                        final IterativeCondition<T> ignoreFunction = 
getIgnoreCondition(currentPattern);
@@ -697,14 +701,16 @@ public class NFACompiler {
                 *
                 * @param condition the condition to extend
                 * @param untilCondition the until condition to join with the 
given condition
+                * @param isTakeCondition whether the {@code condition} is for 
{@code TAKE} edge
                 * @return condition with AND applied or the original condition
                 */
                private IterativeCondition<T> extendWithUntilCondition(
                                IterativeCondition<T> condition,
-                               IterativeCondition<T> untilCondition) {
+                               IterativeCondition<T> untilCondition,
+                               boolean isTakeCondition) {
                        if (untilCondition != null && condition != null) {
                                return new AndCondition<>(new 
NotCondition<>(untilCondition), condition);
-                       } else if (untilCondition != null) {
+                       } else if (untilCondition != null && isTakeCondition) {
                                return new NotCondition<>(untilCondition);
                        }
 
@@ -741,7 +747,8 @@ public class NFACompiler {
                        if (currentGroupPattern != null && 
currentGroupPattern.getUntilCondition() != null) {
                                innerIgnoreCondition = extendWithUntilCondition(
                                        innerIgnoreCondition,
-                                       (IterativeCondition<T>) 
currentGroupPattern.getUntilCondition());
+                                       (IterativeCondition<T>) 
currentGroupPattern.getUntilCondition(),
+                                       false);
                        }
                        return innerIgnoreCondition;
                }
@@ -781,7 +788,8 @@ public class NFACompiler {
                        if (currentGroupPattern != null && 
currentGroupPattern.getUntilCondition() != null) {
                                ignoreCondition = extendWithUntilCondition(
                                        ignoreCondition,
-                                       (IterativeCondition<T>) 
currentGroupPattern.getUntilCondition());
+                                       (IterativeCondition<T>) 
currentGroupPattern.getUntilCondition(),
+                                       false);
                        }
                        return ignoreCondition;
                }
@@ -797,7 +805,8 @@ public class NFACompiler {
                        if (currentGroupPattern != null && 
currentGroupPattern.getUntilCondition() != null) {
                                takeCondition = extendWithUntilCondition(
                                        takeCondition,
-                                       (IterativeCondition<T>) 
currentGroupPattern.getUntilCondition());
+                                       (IterativeCondition<T>) 
currentGroupPattern.getUntilCondition(),
+                                       true);
                        }
                        return takeCondition;
                }
@@ -811,7 +820,8 @@ public class NFACompiler {
                        if (currentGroupPattern != null && 
currentGroupPattern.getUntilCondition() != null) {
                                trueCondition = extendWithUntilCondition(
                                        trueCondition,
-                                       (IterativeCondition<T>) 
currentGroupPattern.getUntilCondition());
+                                       (IterativeCondition<T>) 
currentGroupPattern.getUntilCondition(),
+                                       true);
                        }
                        return trueCondition;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/449c84b0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index d56e883..639541d 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -194,6 +194,53 @@ public class UntilConditionITCase {
        }
 
        @Test
+       public void testUntilConditionFollowedByOneOrMoreConsecutive2() throws 
Exception {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event startEvent = new Event(40, "c", 1.0);
+               Event middleEvent1 = new Event(41, "a", 2.0);
+               Event middleEvent2 = new Event(42, "b", 3.0);
+               Event middleEvent3 = new Event(43, "a", 4.0);
+               Event breaking = new Event(45, "a", 5.0);
+               Event ignored = new Event(46, "a", 6.0);
+
+               inputEvents.add(new StreamRecord<>(startEvent, 1));
+               inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+               inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+               inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+               inputEvents.add(new StreamRecord<>(breaking, 7));
+               inputEvents.add(new StreamRecord<>(ignored, 8));
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().consecutive().until(UNTIL_CONDITION)
+                       .followedBy("end").where(
+                               UNTIL_CONDITION
+                       );
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(startEvent, middleEvent1, breaking)
+               ));
+               assertTrue(nfa.isEmpty());
+       }
+
+       @Test
        public void testUntilConditionFollowedByZeroOrMore() throws Exception {
                List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 

Reply via email to