Repository: flink
Updated Branches:
  refs/heads/master b3ffd919f -> d21d5d632


[FLINK-7147] [cep] Support greedy quantifier in CEP

This closes #4296.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d21d5d63
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d21d5d63
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d21d5d63

Branch: refs/heads/master
Commit: d21d5d632374f9915140190d4d249379a72c90cf
Parents: b3ffd91
Author: Dian Fu <fudian...@alibaba-inc.com>
Authored: Tue Jul 11 16:03:42 2017 +0800
Committer: Dawid Wysakowicz <dwysakow...@apache.org>
Committed: Thu Aug 24 09:04:02 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  61 +-
 .../flink/cep/scala/pattern/Pattern.scala       |  12 +
 .../apache/flink/cep/nfa/StateTransition.java   |   4 +
 .../flink/cep/nfa/compiler/NFACompiler.java     |  86 +-
 .../org/apache/flink/cep/pattern/Pattern.java   |  27 +
 .../apache/flink/cep/pattern/Quantifier.java    |  12 +-
 .../org/apache/flink/cep/nfa/GreedyITCase.java  | 907 +++++++++++++++++++
 7 files changed, 1093 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 4b13bb3..91125ca 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -163,8 +163,9 @@ In FlinkCEP, looping patterns can be specified using these 
methods: `pattern.one
 more occurrences of a given event (e.g. the `b+` mentioned previously); and 
`pattern.times(#ofTimes)`, for patterns that
 expect a specific number of occurrences of a given type of event, e.g. 4 
`a`'s; and `pattern.times(#fromTimes, #toTimes)`,
 for patterns that expect a specific minimum number of occurrences and maximum 
number of occurrences of a given type of event,
-e.g. 2-4 `a`s. All patterns, looping or not, can be made optional using the 
`pattern.optional()` method. For a pattern
-named `start`, the following are valid quantifiers:
+e.g. 2-4 `a`s. Looping patterns can be made greedy using the 
`pattern.greedy()` method and group pattern cannot be made greedy
+currently. All patterns, looping or not, can be made optional using the 
`pattern.optional()` method.
+For a pattern named `start`, the following are valid quantifiers:
 
  <div class="codetabs" markdown="1">
  <div data-lang="java" markdown="1">
@@ -178,21 +179,35 @@ named `start`, the following are valid quantifiers:
  // expecting 2, 3 or 4 occurrences
  start.times(2, 4);
 
+ // expecting 2, 3 or 4 occurrences and repeating as many as possible
+ start.times(2, 4).greedy();
+
  // expecting 0, 2, 3 or 4 occurrences
  start.times(2, 4).optional();
 
+ // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
+ start.times(2, 4).optional().greedy();
+
  // expecting 1 or more occurrences
  start.oneOrMore();
 
+ // expecting 1 or more occurrences and repeating as many as possible
+ start.oneOrMore().greedy();
+
  // expecting 0 or more occurrences
  start.oneOrMore().optional();
 
+ // expecting 0 or more occurrences and repeating as many as possible
+ start.oneOrMore().optional().greedy();
+
  // expecting 2 or more occurrences
  start.timesOrMore(2);
 
- // expecting 0, 2 or more occurrences
- start.timesOrMore(2).optional();
+ // expecting 2 or more occurrences and repeating as many as possible
+ start.timesOrMore(2).greedy();
 
+ // expecting 0, 2 or more occurrences and repeating as many as possible
+ start.timesOrMore(2).optional().greedy();
  {% endhighlight %}
  </div>
 
@@ -207,20 +222,38 @@ named `start`, the following are valid quantifiers:
  // expecting 2, 3 or 4 occurrences
  start.times(2, 4);
 
+ // expecting 2, 3 or 4 occurrences and repeating as many as possible
+ start.times(2, 4).greedy();
+
  // expecting 0, 2, 3 or 4 occurrences
  start.times(2, 4).optional();
 
+ // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
+ start.times(2, 4).optional().greedy();
+
  // expecting 1 or more occurrences
  start.oneOrMore()
 
+ // expecting 1 or more occurrences and repeating as many as possible
+ start.oneOrMore().greedy();
+
  // expecting 0 or more occurrences
  start.oneOrMore().optional()
 
+ // expecting 0 or more occurrences and repeating as many as possible
+ start.oneOrMore().optional().greedy();
+
  // expecting 2 or more occurrences
  start.timesOrMore(2);
 
+ // expecting 2 or more occurrences and repeating as many as possible
+ start.timesOrMore(2).greedy();
+
  // expecting 0, 2 or more occurrences
  start.timesOrMore(2).optional();
+
+ // expecting 0, 2 or more occurrences and repeating as many as possible
+ start.timesOrMore(2).optional().greedy();
  {% endhighlight %}
  </div>
  </div>
@@ -536,6 +569,16 @@ pattern.oneOrMore().optional();
           </td>
        </tr>
        <tr>
+          <td><strong>greedy()</strong></td>
+          <td>
+              <p>Specifies that this pattern is greedy, i.e. it will repeat as 
many as possible. This is only applicable
+              to quantifiers and it does not support group pattern 
currently.</p>
+{% highlight java %}
+pattern.oneOrMore().greedy();
+{% endhighlight %}
+          </td>
+       </tr>
+       <tr>
           <td><strong>consecutive()</strong><a 
name="consecutive_java"></a></td>
           <td>
               <p>Works in conjunction with <code>oneOrMore()</code> and 
<code>times()</code> and imposes strict contiguity between the matching
@@ -718,6 +761,16 @@ pattern.oneOrMore().optional()
           </td>
        </tr>
        <tr>
+          <td><strong>greedy()</strong></td>
+          <td>
+             <p>Specifies that this pattern is greedy, i.e. it will repeat as 
many as possible. This is only applicable
+             to quantifiers and it does not support group pattern 
currently.</p>
+{% highlight scala %}
+pattern.oneOrMore().greedy()
+{% endhighlight %}
+          </td>
+       </tr>
+       <tr>
           <td><strong>consecutive()</strong><a 
name="consecutive_scala"></a></td>
           <td>
             <p>Works in conjunction with <code>oneOrMore()</code> and 
<code>times()</code> and imposes strict contiguity between the matching

http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 5b41b90..dba328c 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -353,6 +353,18 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
+    * Specifies that this pattern is greedy.
+    * This means as many events as possible will be matched to this pattern.
+    *
+    * @return The same pattern with { @link Quantifier#greedy} set to true.
+    * @throws MalformedPatternException if the quantifier is not applicable to 
this pattern.
+    */
+  def greedy: Pattern[T, F] = {
+    jPattern.greedy()
+    this
+  }
+
+  /**
     * Specifies exact number of times that this pattern should be matched.
     *
     * @param times number of times matching event must appear

http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index bb61e09..e2fd900 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -76,6 +76,10 @@ public class StateTransition<T> implements Serializable {
                return newCondition;
        }
 
+       public void setCondition(IterativeCondition<T> condition) {
+               this.newCondition = condition;
+       }
+
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof StateTransition) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 4d4baca..593c94f 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -117,6 +117,7 @@ public class NFACompiler {
                private Map<GroupPattern<T, ?>, Boolean> firstOfLoopMap = new 
HashMap<>();
                private Pattern<T, ?> currentPattern;
                private Pattern<T, ?> followingPattern;
+               private Map<String, State<T>> originalStateMap = new 
HashMap<>();
 
                NFAFactoryCompiler(final Pattern<T, ?> pattern) {
                        this.currentPattern = pattern;
@@ -382,6 +383,21 @@ public class NFACompiler {
                        return copyOfSink;
                }
 
+               private State<T> copy(final State<T> state) {
+                       final State<T> copyOfState = createState(
+                               
NFAStateNameHandler.getOriginalNameFromInternal(state.getName()),
+                               state.getStateType());
+                       for (StateTransition<T> tStateTransition : 
state.getStateTransitions()) {
+                               copyOfState.addStateTransition(
+                                       tStateTransition.getAction(),
+                                       
tStateTransition.getTargetState().equals(tStateTransition.getSourceState())
+                                                       ? copyOfState
+                                                       : 
tStateTransition.getTargetState(),
+                                       tStateTransition.getCondition());
+                       }
+                       return copyOfState;
+               }
+
                private void addStopStates(final State<T> state) {
                        for (Tuple2<IterativeCondition<T>, String> 
notCondition: getCurrentNotCondition()) {
                                final State<T> stopState = 
createStopState(notCondition.f0, notCondition.f1);
@@ -421,6 +437,15 @@ public class NFACompiler {
                                untilCondition,
                                true);
 
+                       if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)
 &&
+                               times.getFrom() != times.getTo()) {
+                               if (untilCondition != null) {
+                                       State<T> sinkStateCopy = 
copy(sinkState);
+                                       
originalStateMap.put(sinkState.getName(), sinkStateCopy);
+                               }
+                               updateWithGreedyCondition(sinkState, 
takeCondition);
+                       }
+
                        for (int i = times.getFrom(); i < times.getTo(); i++) {
                                lastSink = createSingletonState(lastSink, 
proceedState, takeCondition, innerIgnoreCondition, true);
                                addStopStateToLooping(lastSink);
@@ -526,18 +551,32 @@ public class NFACompiler {
                                return createGroupPatternState((GroupPattern) 
currentPattern, sinkState, proceedState, isOptional);
                        }
 
-                       final IterativeCondition<T> trueFunction = 
getTrueFunction();
-
                        final State<T> singletonState = 
createState(currentPattern.getName(), State.StateType.Normal);
                        // if event is accepted then all notPatterns previous 
to the optional states are no longer valid
                        final State<T> sink = 
copyWithoutTransitiveNots(sinkState);
                        singletonState.addTake(sink, takeCondition);
 
+                       // if no element accepted the previous nots are still 
valid.
+                       final IterativeCondition<T> proceedCondition = 
getTrueFunction();
+
                        // for the first state of a group pattern, its PROCEED 
edge should point to the following state of
                        // that group pattern and the edge will be added at the 
end of creating the NFA for that group pattern
                        if (isOptional && !headOfGroup(currentPattern)) {
-                               // if no element accepted the previous nots are 
still valid.
-                               singletonState.addProceed(proceedState, 
trueFunction);
+                               if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+                                       final IterativeCondition<T> 
untilCondition =
+                                               (IterativeCondition<T>) 
currentPattern.getUntilCondition();
+                                       if (untilCondition != null) {
+                                               singletonState.addProceed(
+                                                       
originalStateMap.get(proceedState.getName()),
+                                                       new 
AndCondition<>(proceedCondition, untilCondition));
+                                       }
+                                       singletonState.addProceed(proceedState,
+                                               untilCondition != null
+                                                       ? new 
AndCondition<>(proceedCondition, new NotCondition<>(untilCondition))
+                                                       : proceedCondition);
+                               } else {
+                                       singletonState.addProceed(proceedState, 
proceedCondition);
+                               }
                        }
 
                        if (ignoreCondition != null) {
@@ -569,11 +608,12 @@ public class NFACompiler {
                        final State<T> sinkState,
                        final State<T> proceedState,
                        final boolean isOptional) {
-                       final IterativeCondition<T> trueFunction = 
getTrueFunction();
+                       final IterativeCondition<T> proceedCondition = 
getTrueFunction();
 
                        Pattern<T, ?> oldCurrentPattern = currentPattern;
                        Pattern<T, ?> oldFollowingPattern = followingPattern;
                        GroupPattern<T, ?> oldGroupPattern = 
currentGroupPattern;
+
                        State<T> lastSink = sinkState;
                        currentGroupPattern = groupPattern;
                        currentPattern = groupPattern.getRawPattern();
@@ -582,7 +622,7 @@ public class NFACompiler {
                        if (isOptional) {
                                // for the first state of a group pattern, its 
PROCEED edge should point to
                                // the following state of that group pattern
-                               lastSink.addProceed(proceedState, trueFunction);
+                               lastSink.addProceed(proceedState, 
proceedCondition);
                        }
                        currentPattern = oldCurrentPattern;
                        followingPattern = oldFollowingPattern;
@@ -600,19 +640,20 @@ public class NFACompiler {
                private State<T> createLoopingGroupPatternState(
                        final GroupPattern<T, ?> groupPattern,
                        final State<T> sinkState) {
-                       final IterativeCondition<T> trueFunction = 
getTrueFunction();
+                       final IterativeCondition<T> proceedCondition = 
getTrueFunction();
 
                        Pattern<T, ?> oldCurrentPattern = currentPattern;
                        Pattern<T, ?> oldFollowingPattern = followingPattern;
                        GroupPattern<T, ?> oldGroupPattern = 
currentGroupPattern;
+
                        final State<T> dummyState = 
createState(currentPattern.getName(), State.StateType.Normal);
                        State<T> lastSink = dummyState;
                        currentGroupPattern = groupPattern;
                        currentPattern = groupPattern.getRawPattern();
                        lastSink = createMiddleStates(lastSink);
                        lastSink = convertPattern(lastSink);
-                       lastSink.addProceed(sinkState, trueFunction);
-                       dummyState.addProceed(lastSink, trueFunction);
+                       lastSink.addProceed(sinkState, proceedCondition);
+                       dummyState.addProceed(lastSink, proceedCondition);
                        currentPattern = oldCurrentPattern;
                        followingPattern = oldFollowingPattern;
                        currentGroupPattern = oldGroupPattern;
@@ -643,9 +684,23 @@ public class NFACompiler {
                                untilCondition,
                                true);
 
-                       final IterativeCondition<T> proceedCondition = 
getTrueFunction();
+                       IterativeCondition<T> proceedCondition = 
getTrueFunction();
                        final State<T> loopingState = 
createState(currentPattern.getName(), State.StateType.Normal);
-                       loopingState.addProceed(sinkState, proceedCondition);
+
+                       if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+                               if (untilCondition != null) {
+                                       State<T> sinkStateCopy = 
copy(sinkState);
+                                       loopingState.addProceed(sinkStateCopy, 
new AndCondition<>(proceedCondition, untilCondition));
+                                       
originalStateMap.put(sinkState.getName(), sinkStateCopy);
+                               }
+                               loopingState.addProceed(sinkState,
+                                       untilCondition != null
+                                               ? new 
AndCondition<>(proceedCondition, new NotCondition<>(untilCondition))
+                                               : proceedCondition);
+                               updateWithGreedyCondition(sinkState, 
getTakeCondition(currentPattern));
+                       } else {
+                               loopingState.addProceed(sinkState, 
proceedCondition);
+                       }
                        loopingState.addTake(takeCondition);
 
                        addStopStateToLooping(loopingState);
@@ -791,6 +846,15 @@ public class NFACompiler {
                        }
                        return trueCondition;
                }
+
+               private void updateWithGreedyCondition(
+                       State<T> state,
+                       IterativeCondition<T> takeCondition) {
+                       for (StateTransition<T> stateTransition : 
state.getStateTransitions()) {
+                               stateTransition.setCondition(
+                                       new 
AndCondition<>(stateTransition.getCondition(), new 
NotCondition<>(takeCondition)));
+                       }
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index adf1397..33574b3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -312,6 +312,7 @@ public class Pattern<T, F extends T> {
         * @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
         */
        public Pattern<T, F> optional() {
+               checkIfPreviousPatternGreedy();
                quantifier.optional();
                return this;
        }
@@ -338,6 +339,20 @@ public class Pattern<T, F extends T> {
        }
 
        /**
+        * Specifies that this pattern is greedy.
+        * This means as many events as possible will be matched to this 
pattern.
+        *
+        * @return The same pattern with {@link Quantifier#greedy} set to true.
+        * @throws MalformedPatternException if the quantifier is not 
applicable to this pattern.
+        */
+       public Pattern<T, F> greedy() {
+               checkIfNoNotPattern();
+               checkIfNoGroupPattern();
+               this.quantifier.greedy();
+               return this;
+       }
+
+       /**
         * Specifies exact number of times that this pattern should be matched.
         *
         * @param times number of times matching event must appear
@@ -509,4 +524,16 @@ public class Pattern<T, F extends T> {
                                        "Current quantifier is: " + quantifier);
                }
        }
+
+       private void checkIfNoGroupPattern() {
+               if (this instanceof GroupPattern) {
+                       throw new MalformedPatternException("Option not 
applicable to group pattern");
+               }
+       }
+
+       private void checkIfPreviousPatternGreedy() {
+               if (previous != null && 
previous.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
+                       throw new MalformedPatternException("Optional pattern 
cannot be preceded by greedy pattern");
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 2136706..b55051d 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -105,6 +105,15 @@ public class Quantifier {
                properties.add(Quantifier.QuantifierProperty.OPTIONAL);
        }
 
+       public void greedy() {
+               checkPattern(!(innerConsumingStrategy == 
ConsumingStrategy.SKIP_TILL_ANY),
+                       "Option not applicable to FollowedByAny pattern");
+               checkPattern(!hasProperty(Quantifier.QuantifierProperty.SINGLE),
+                       "Option not applicable to singleton quantifier");
+
+               properties.add(QuantifierProperty.GREEDY);
+       }
+
        @Override
        public boolean equals(Object o) {
                if (this == o) {
@@ -130,7 +139,8 @@ public class Quantifier {
                SINGLE,
                LOOPING,
                TIMES,
-               OPTIONAL
+               OPTIONAL,
+               GREEDY
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d21d5d63/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
new file mode 100644
index 0000000..2c7f23c
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
@@ -0,0 +1,907 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+       @Test
+       public void testGreedyZeroOrMore() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event a3 = new Event(43, "a", 2.0);
+               Event d = new Event(44, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(a3, 4));
+               inputEvents.add(new StreamRecord<>(d, 5));
+
+               // c a* d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, a3, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyZeroOrMoreInBetween() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event a3 = new Event(43, "a", 2.0);
+               Event d = new Event(44, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
2));
+               inputEvents.add(new StreamRecord<>(a1, 3));
+               inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
4));
+               inputEvents.add(new StreamRecord<>(a2, 5));
+               inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
6));
+               inputEvents.add(new StreamRecord<>(a3, 7));
+               inputEvents.add(new StreamRecord<>(d, 8));
+
+               // c a* d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, a3, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyZeroOrMoreWithDummyEventsAfterQuantifier() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event d = new Event(44, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
4));
+               inputEvents.add(new StreamRecord<>(d, 5));
+
+               // c a* d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyZeroOrMoreWithDummyEventsBeforeQuantifier() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event d = new Event(44, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
2));
+               inputEvents.add(new StreamRecord<>(d, 5));
+
+               // c a* d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 3.0);
+               Event a3 = new Event(43, "a", 3.0);
+               Event d = new Event(45, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(a3, 4));
+               inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
+               inputEvents.add(new StreamRecord<>(d, 6));
+
+               // c a* d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().optional().greedy().until(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getPrice() > 3.0;
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, a3, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyUntilWithDummyEventsBeforeQuantifier() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 3.0);
+               Event a3 = new Event(43, "a", 3.0);
+               Event d = new Event(45, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2));
+               inputEvents.add(new StreamRecord<>(a1, 3));
+               inputEvents.add(new StreamRecord<>(a2, 4));
+               inputEvents.add(new StreamRecord<>(a3, 5));
+               inputEvents.add(new StreamRecord<>(d, 6));
+
+               // c a* d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().optional().greedy().until(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getPrice() > 3.0;
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyOneOrMore() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event a3 = new Event(43, "a", 2.0);
+               Event d = new Event(44, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(a3, 4));
+               inputEvents.add(new StreamRecord<>(d, 5));
+
+               // c a+ d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, a3, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyOneOrMoreInBetween() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event a3 = new Event(43, "a", 2.0);
+               Event d = new Event(44, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
2));
+               inputEvents.add(new StreamRecord<>(a1, 3));
+               inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
4));
+               inputEvents.add(new StreamRecord<>(a2, 5));
+               inputEvents.add(new StreamRecord<>(new Event(1, "dummy", 1111), 
6));
+               inputEvents.add(new StreamRecord<>(a3, 7));
+               inputEvents.add(new StreamRecord<>(d, 8));
+
+               // c a+ d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, a3, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyOneOrMoreWithDummyEventsAfterQuantifier() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event d = new Event(44, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
4));
+               inputEvents.add(new StreamRecord<>(d, 5));
+
+               // c a+ d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyOneOrMoreWithDummyEventsBeforeQuantifier() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event d = new Event(44, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
2));
+               inputEvents.add(new StreamRecord<>(d, 5));
+
+               // c a+ d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, 
Lists.<List<Event>>newArrayList());
+       }
+
+       @Test
+       public void testGreedyUntilOneOrMoreWithDummyEventsAfterQuantifier() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 3.0);
+               Event a3 = new Event(43, "a", 3.0);
+               Event d = new Event(45, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(a3, 4));
+               inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 5));
+               inputEvents.add(new StreamRecord<>(d, 6));
+
+               // c a+ d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().greedy().until(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getPrice() > 3.0;
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, a3, d)
+               ));
+       }
+
+       @Test
+       public void testGreedyUntilOneOrMoreWithDummyEventsBeforeQuantifier() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 3.0);
+               Event a3 = new Event(43, "a", 3.0);
+               Event d = new Event(45, "d", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(new Event(44, "a", 4.0), 2));
+               inputEvents.add(new StreamRecord<>(a1, 3));
+               inputEvents.add(new StreamRecord<>(a2, 4));
+               inputEvents.add(new StreamRecord<>(a3, 5));
+               inputEvents.add(new StreamRecord<>(d, 6));
+
+               // c a+ d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().greedy().until(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getPrice() > 3.0;
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, 
Lists.<List<Event>>newArrayList());
+       }
+
+       @Test
+       public void testGreedyZeroOrMoreBeforeGroupPattern() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(40, "a", 1.0);
+               Event a2 = new Event(40, "a", 1.0);
+               Event a3 = new Event(40, "a", 1.0);
+               Event d1 = new Event(40, "d", 1.0);
+               Event e1 = new Event(40, "e", 1.0);
+               Event d2 = new Event(40, "d", 1.0);
+               Event e2 = new Event(40, "e", 1.0);
+               Event f = new Event(44, "f", 3.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(new Event(43, "dummy", 2.0), 
4));
+               inputEvents.add(new StreamRecord<>(a3, 5));
+               inputEvents.add(new StreamRecord<>(d1, 6));
+               inputEvents.add(new StreamRecord<>(e1, 7));
+               inputEvents.add(new StreamRecord<>(d2, 8));
+               inputEvents.add(new StreamRecord<>(e2, 9));
+               inputEvents.add(new StreamRecord<>(f, 10));
+
+               // c a* (d e){2} f
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               
}).oneOrMore().optional().greedy().followedBy(Pattern.<Event>begin("middle1").where(new
 SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               }).followedBy("middle2").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("e");
+                       }
+               })).times(2).followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("f");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, a3, d1, e1, d2, e2, f)
+               ));
+       }
+
+       @Test
+       public void testEndWithZeroOrMoreGreedy() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event a3 = new Event(43, "a", 2.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(new Event(44, "dummy", 2.0), 
4));
+               inputEvents.add(new StreamRecord<>(a3, 5));
+
+               // c a*
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().optional().greedy();
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c),
+                       Lists.newArrayList(c, a1),
+                       Lists.newArrayList(c, a1, a2),
+                       Lists.newArrayList(c, a1, a2, a3)
+               ));
+       }
+
+       @Test
+       public void testEndWithZeroOrMoreConsecutiveGreedy() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event a3 = new Event(43, "a", 2.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(new Event(44, "dummy", 2.0), 
4));
+               inputEvents.add(new StreamRecord<>(a3, 5));
+
+               // c a*
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).oneOrMore().optional().consecutive().greedy();
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c),
+                       Lists.newArrayList(c, a1),
+                       Lists.newArrayList(c, a1, a2)
+               ));
+       }
+
+       @Test
+       public void testEndWithGreedyTimesRange() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event a3 = new Event(43, "a", 2.0);
+               Event a4 = new Event(44, "a", 2.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(a3, 4));
+               inputEvents.add(new StreamRecord<>(a4, 5));
+               inputEvents.add(new StreamRecord<>(new Event(44, "dummy", 2.0), 
6));
+
+               // c a{2, 5}
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2, 5).greedy();
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2),
+                       Lists.newArrayList(c, a1, a2, a3),
+                       Lists.newArrayList(c, a1, a2, a3, a4)
+               ));
+       }
+
+       @Test
+       public void testGreedyTimesRange() {
+               List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+               Event c = new Event(40, "c", 1.0);
+               Event a1 = new Event(41, "a", 2.0);
+               Event a2 = new Event(42, "a", 2.0);
+               Event a3 = new Event(43, "a", 2.0);
+               Event a4 = new Event(44, "a", 2.0);
+               Event d = new Event(45, "d", 2.0);
+
+               inputEvents.add(new StreamRecord<>(c, 1));
+               inputEvents.add(new StreamRecord<>(a1, 2));
+               inputEvents.add(new StreamRecord<>(a2, 3));
+               inputEvents.add(new StreamRecord<>(a3, 4));
+               inputEvents.add(new StreamRecord<>(a4, 5));
+               inputEvents.add(new StreamRecord<>(d, 6));
+
+               // c a{2, 5} d
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("c");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("a");
+                       }
+               }).times(2, 5).greedy().followedBy("end").where(new 
SimpleCondition<Event>() {
+                       private static final long serialVersionUID = 
5726188262756267490L;
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("d");
+                       }
+               });
+
+               NFA<Event> nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+               final List<List<Event>> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+               compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+                       Lists.newArrayList(c, a1, a2, a3, a4, d)
+               ));
+       }
+}

Reply via email to