[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16150436#comment-16150436 ] ASF GitHub Bot commented on FLINK-7169: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4331 > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147903#comment-16147903 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 @dawidwys Thanks for the reviews and comments, please change the documentation during merge. And @kl0u , thanks for the reviews! > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147900#comment-16147900 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r136169035 --- Diff: docs/dev/libs/cep.md --- @@ -1250,6 +1250,104 @@ pattern.within(Time.seconds(10)) +### After Match Skip Strategy + +For a given pattern, there can be many successful matches as data stream flows. In order to control how to restart the match process after a successful match, we need to specify the skip strategy called `AfterMatchSkipStrategy`. There're four types of skip strategies, listed as follows: --- End diff -- That sounds good to me, please change it during merge. Thanks a lot. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146868#comment-16146868 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4331 I think you can merge it @dawidwys . I was following the evolution of this PR and I think it looks good ;) . Thanks for the work both @yestinchen and @dawidwys ! > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146860#comment-16146860 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r135465794 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -511,4 +530,5 @@ object Pattern { */ def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F] = --- End diff -- Missing version with `AfterMatchSkipStrategy`. I will change it during merge, if you are ok with it. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146861#comment-16146861 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r135465365 --- Diff: docs/dev/libs/cep.md --- @@ -1250,6 +1250,104 @@ pattern.within(Time.seconds(10)) +### After Match Skip Strategy + +For a given pattern, there can be many successful matches as data stream flows. In order to control how to restart the match process after a successful match, we need to specify the skip strategy called `AfterMatchSkipStrategy`. There're four types of skip strategies, listed as follows: --- End diff -- That is not true. We do not restart the match process at all. It just controls which results are discarded. I will change it during merge, if you are ok with it. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145363#comment-16145363 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 Hi @dawidwys , sorry for the late response. Thanks for your reviews, I have updated the test and the document. Please take a look if you have time. Thanks. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134871#comment-16134871 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r134170320 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -707,7 +789,7 @@ private boolean checkFilterCondition(ComputationState computationState, Itera result.put(key, values); } - for (T event: events) { + for (T event : events) { --- End diff -- Unrelated change > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134870#comment-16134870 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r134170202 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -695,10 +778,9 @@ private boolean checkFilterCondition(ComputationState computationState, Itera } // for a given computation state, we cannot have more than one matching patterns. Preconditions.checkState(paths.size() == 1); - - Mapresult = new HashMap<>(); + Map result = new LinkedHashMap<>(); Map path = paths.get(0); - for (String key: path.keySet()) { + for (String key : path.keySet()) { --- End diff -- Unrelated change > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134869#comment-16134869 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r134170147 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -990,7 +1072,7 @@ public void serialize(NFA record, DataOutputView target) throws IOException { boolean handleTimeout = source.readBoolean(); NFA nfa = new NFA<>(eventSerializer, windowTime, handleTimeout); - nfa.states = states; + nfa.addStates(states); --- End diff -- This change can result in multiple start states being added. Anyway this is unrelated change > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134873#comment-16134873 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4331 Still missing some tests: - ensuring the `NFA#extractCurrentMatches` returns patterns in order. - skip to first/last with `oneOrMore` Docs missing > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134872#comment-16134872 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r134170217 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -557,8 +640,8 @@ private boolean isSelfIgnore(final StateTransition edge) { if (computationState.isStartState()) { int totalBranches = calculateIncreasingSelfState( - outgoingEdges.getTotalIgnoreBranches(), - outgoingEdges.getTotalTakeBranches()); + outgoingEdges.getTotalIgnoreBranches(), --- End diff -- Unrelated change > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130168#comment-16130168 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r133667042 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(QueuecomputationStates, --- End diff -- The changes in the next commit are not enough. The `Map` type should be also changed in `SharedBuffer#extractPatterns`. Also please provide tests for this behaviour (returning results in Pattern order), so that it will not be possible to change it by mistake. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128921#comment-16128921 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r133478539 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(QueuecomputationStates, --- End diff -- You are absolutely right. Thanks for the tip. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128920#comment-16128920 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r133478273 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,29 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ --- End diff -- We only need to check the skip strategy before compile the `Pattern` to `NFA`, I think it's more reasonable to place it here. Also, we need to check whether the `patternName` field in the `AfterMatchSkipStrategy` is a valid reference, which can not be done easily in `Pattern` class. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126650#comment-16126650 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r133091608 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(QueuecomputationStates, + Collection
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125389#comment-16125389 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132895649 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN. + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; + + // pattern name to skip to + String patternName = null; + --- End diff -- This can be `private`. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125388#comment-16125388 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132896369 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java --- @@ -42,20 +43,21 @@ private static final long serialVersionUID = 3570542177814518158L; public TimeoutKeyedCEPPatternOperator( - TypeSerializer inputSerializer, - boolean isProcessingTime, - TypeSerializer keySerializer, - NFACompiler.NFAFactory nfaFactory, - boolean migratingFromOldKeyedOperator, - EventComparator comparator) { + TypeSerializer inputSerializer, --- End diff -- Please revert the unrelated formatting changes. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125387#comment-16125387 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132894559 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -96,7 +97,8 @@ } else { final NFAFactoryCompiler nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern); nfaFactoryCompiler.compileFactory(); - return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling); + return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), + nfaFactoryCompiler.getStates(), timeoutHandling); } --- End diff -- Unrelated formatting change. Please revert. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125386#comment-16125386 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132896299 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java --- @@ -42,28 +43,29 @@ private static final long serialVersionUID = 5328573789532074581L; public KeyedCEPPatternOperator( - TypeSerializer inputSerializer, - boolean isProcessingTime, - TypeSerializer keySerializer, - NFACompiler.NFAFactory nfaFactory, - boolean migratingFromOldKeyedOperator, - EventComparator comparator) { + TypeSerializer inputSerializer, --- End diff -- Please revert the unrelated formatting changes. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125384#comment-16125384 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132895639 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN. + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; --- End diff -- This can be `private`. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125385#comment-16125385 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132896150 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java --- @@ -120,13 +121,16 @@ private final EventComparator comparator; + final AfterMatchSkipStrategy afterMatchSkipStrategy; + public AbstractKeyedCEPPatternOperator( - final TypeSerializer inputSerializer, --- End diff -- Please revert the formatting changes (tabs). > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125378#comment-16125378 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132895143 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,29 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST || + afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) { + Patternpattern = currentPattern; + while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { + if (pattern.getPrevious() == null) { + break; + } else { + pattern = pattern.getPrevious(); + } + } + --- End diff -- The above can become: ``` while (pattern.getPrevious() != null && !pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { pattern = pattern.getPrevious(); } ``` right? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125383#comment-16125383 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132898814 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(QueuecomputationStates, --- End diff -- This method is very fragile as it makes the assumption that the different patterns appear in the match in the order that they were declared in the pattern declaration, e.g. for a pattern `A -> B -> C` it assumes that the partial matches will appear in the order [`matches for A`, `matches for B`, `matches for C`]. This is *NOT* correct as we use a `HashMap` which does not provide any ordering guarantees. A quick fix would be to use a `LinkedHashMap` as the return type for the `extractCurrentMatches` and put the results there in the correct order. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125381#comment-16125381 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132894611 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -117,9 +119,11 @@ private Map, Boolean> firstOfLoopMap = new HashMap<>(); private Pattern currentPattern; private Pattern followingPattern; + private AfterMatchSkipStrategy afterMatchSkipStrategy; --- End diff -- This can be `final`. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125390#comment-16125390 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132895822 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN. + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; + + // pattern name to skip to + String patternName = null; + + /** +* Skip to first *pattern*. +* @param patternName the pattern name to skip to +* @return --- End diff -- The `@return` javadoc is missing on all of the methods. Also add a brief description of what each one implies. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125377#comment-16125377 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132895590 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN. + * + */ +public class AfterMatchSkipStrategy implements Serializable { + --- End diff -- Please add a `serialVersionUID`, e.g.: `private static final long serialVersionUID = -3601462998929198774L;` > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125379#comment-16125379 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132895280 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,29 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ --- End diff -- Could this whole check go in the `Pattern` class? This will make the code clearer. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125382#comment-16125382 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132895977 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN. + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; + + // pattern name to skip to + String patternName = null; + + /** +* Skip to first *pattern*. +* @param patternName the pattern name to skip to +* @return +*/ + public static AfterMatchSkipStrategy skipToFirst(String patternName) { + return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_FIRST, patternName); + } + + /** +* Skip to last *pattern*. +* @param patternName the pattern name to skip to +* @return +*/ + public static AfterMatchSkipStrategy skipToLast(String patternName) { + return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_LAST, patternName); + } + + /** +* Skip past last event. +* @return +*/ + public static AfterMatchSkipStrategy skipPastLastEvent() { + return new AfterMatchSkipStrategy(SkipStrategy.SKIP_PAST_LAST_EVENT); + } + + /** +* Skip to next event. +* @return +*/ + public static AfterMatchSkipStrategy skipToNextEvent() { + return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_NEXT_EVENT); + } + + private AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + private AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) { + if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) { + throw new IllegalArgumentException("The patternName field can not be empty when SkipStrategy is " + strategy); + } + this.strategy = strategy; + this.patternName = patternName; + } + + public SkipStrategy getStrategy() { + return strategy; + } + + public String getPatternName() { + return patternName; + } + + @Override + public String toString() { --- End diff -- If `patternName == null` then there is nothing to print, so it would be nice to adjust the `toString()` accordingly. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125392#comment-16125392 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132896183 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java --- @@ -120,13 +121,16 @@ private final EventComparator comparator; + final AfterMatchSkipStrategy afterMatchSkipStrategy; --- End diff -- This can be `protected`. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125380#comment-16125380 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132895583 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. --- End diff -- You could also add a brief description in the class javadoc for each of the strategies. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125391#comment-16125391 ] ASF GitHub Bot commented on FLINK-7169: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132897107 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -158,9 +158,9 @@ private boolean nfaChanged; public NFA( - final TypeSerializer eventSerializer, - final long windowTime, - final boolean handleTimeout) { + final TypeSerializer eventSerializer, --- End diff -- Please revert the unrelated formatting change. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125189#comment-16125189 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132867483 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(QueuecomputationStates, + Collection
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124622#comment-16124622 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 Thanks for your reviews @dawidwys ! I'll update the doc in the following commits. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16123249#comment-16123249 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132669361 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -859,7 +940,7 @@ public boolean apply(@Nullable State input) { */ public static final class NFASerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { --- End diff -- Revert the changes in serializer. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121220#comment-16121220 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132386948 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,59 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy(); + if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST || + afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) { + Patternpattern = currentPattern; + while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { + pattern = pattern.getPrevious(); + } + // pattern name match check. + if (pattern == null) { + throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " + + "can not be found in the given Pattern"); + } else { + // can not be used with optional states. + if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { + throw new MalformedPatternException("the AfterMatchSkipStrategy " + + afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern"); + } + } + + // start position check. + if (pattern.getPrevious() == null) { --- End diff -- Great, I'll just remove all those optional state check. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121203#comment-16121203 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132384194 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,59 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy(); + if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST || + afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) { + Patternpattern = currentPattern; + while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { + pattern = pattern.getPrevious(); + } + // pattern name match check. + if (pattern == null) { + throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " + + "can not be found in the given Pattern"); + } else { + // can not be used with optional states. + if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { + throw new MalformedPatternException("the AfterMatchSkipStrategy " + + afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern"); + } + } + + // start position check. + if (pattern.getPrevious() == null) { --- End diff -- I personally see no reason for a semantic with RuntimeException. I can't think of any use-case for it. Maybe let's finish this PR without the switch and exceptions and open a JIRA with the switch, ideally with some use-case's for that semantic, so we can further agree on that and see if anyone needs it. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120938#comment-16120938 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132349108 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.List; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; + + // fields + String patternName = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_EVENT, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) { --- End diff -- Good idea, I like that. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120936#comment-16120936 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132348950 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,59 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy(); + if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST || + afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) { + Patternpattern = currentPattern; + while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { + pattern = pattern.getPrevious(); + } + // pattern name match check. + if (pattern == null) { + throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " + + "can not be found in the given Pattern"); + } else { + // can not be used with optional states. + if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { + throw new MalformedPatternException("the AfterMatchSkipStrategy " + + afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern"); + } + } + + // start position check. + if (pattern.getPrevious() == null) { --- End diff -- I agree with you that the fallback approach is much easier to understand and maintain. If we discard nothing, the actual sematics is to use SKIP_TO_NEXT_EVENT for the next match process. But it will have an impact on matching sematics, which may lead to incorrect results. I think users should be aware of what happens. My original thought was to add a configuration switch, to let user choose between throwing exceptions and falling back to a default skip strategy. Do you have any ideas about that? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119867#comment-16119867 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132174353 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java --- @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; +import org.apache.flink.cep.pattern.MalformedPatternException; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.List; +import java.util.Map; + +/** + * After match skip tests. + */ +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase { --- End diff -- Switch the test to be similar to `NFAITCase` to test smaller part of the library. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119869#comment-16119869 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132175179 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java --- @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; +import org.apache.flink.cep.pattern.MalformedPatternException; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.List; +import java.util.Map; + +/** + * After match skip tests. + */ +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath; + private String expected; + + private String lateEventPath; + private String expectedLateEvents; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + expected = ""; + + lateEventPath = tempFolder.newFile().toURI().toString(); + expectedLateEvents = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + compareResultsByLinesInMemory(expectedLateEvents, lateEventPath); + } + + private PatternSelectFunctionnewIdSelectFunction(String ... names) { + return new PatternSelectFunction () { + + @Override + public String select(Map pattern) { + StringBuilder builder = new StringBuilder(); + for (String name: names) { + for (Event e : pattern.get(name)) { + builder.append(e.getId()).append(","); + } + } + return builder.toString(); + } + }; + } + + @Test + public void testSkipToNext() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new Event(1, "a", 0.0), + new Event(2, "a", 0.0), + new Event(3, "a", 0.0), + new Event(4, "a", 0.0), + new Event(5, "a", 0.0), + new Event(6, "a", 0.0) + ); + Pattern pattern = Pattern.begin("start", + new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT)) + .where(new SimpleCondition() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(3); + DataStream result = CEP.pattern(input,
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119853#comment-16119853 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132173777 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,59 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy(); + if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST || + afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) { + Patternpattern = currentPattern; + while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { + pattern = pattern.getPrevious(); + } + // pattern name match check. + if (pattern == null) { + throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " + + "can not be found in the given Pattern"); + } else { + // can not be used with optional states. + if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { --- End diff -- We could also allow this. See comment below. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119860#comment-16119860 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132175300 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java --- @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; +import org.apache.flink.cep.pattern.MalformedPatternException; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.List; +import java.util.Map; + +/** + * After match skip tests. + */ +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath; + private String expected; + + private String lateEventPath; + private String expectedLateEvents; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + expected = ""; + + lateEventPath = tempFolder.newFile().toURI().toString(); + expectedLateEvents = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + compareResultsByLinesInMemory(expectedLateEvents, lateEventPath); + } + + private PatternSelectFunctionnewIdSelectFunction(String ... names) { + return new PatternSelectFunction () { + + @Override + public String select(Map pattern) { + StringBuilder builder = new StringBuilder(); + for (String name: names) { + for (Event e : pattern.get(name)) { + builder.append(e.getId()).append(","); + } + } + return builder.toString(); + } + }; + } + + @Test + public void testSkipToNext() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new Event(1, "a", 0.0), + new Event(2, "a", 0.0), + new Event(3, "a", 0.0), + new Event(4, "a", 0.0), + new Event(5, "a", 0.0), + new Event(6, "a", 0.0) + ); + Pattern pattern = Pattern.begin("start", + new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT)) + .where(new SimpleCondition() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(3); + DataStream result = CEP.pattern(input,
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119861#comment-16119861 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132137487 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -157,15 +157,28 @@ */ private boolean nfaChanged; + /** +* Store the skip strategy. +*/ + private AfterMatchSkipStrategy afterMatchSkipStrategy; + + public NFA( --- End diff -- Don't see a point for creating another ctor. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119864#comment-16119864 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132136797 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.List; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; + + // fields + String patternName = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_EVENT, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) { + if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) { + throw new IllegalArgumentException("the patternName field can not be empty when SkipStrategy is " + strategy); + } + this.strategy = strategy; + this.patternName = patternName; + } + + public SkipStrategy getStrategy() { + return strategy; + } + + public String getPatternName() { + return patternName; + } + + @Override + public String toString() { + return "AfterMatchStrategy{" + + "strategy=" + strategy + + ", patternName=" + patternName + + '}'; + } + + /** +* Skip Strategy Enum. +*/ + public enum SkipStrategy{ + SKIP_TO_NEXT_EVENT, + SKIP_PAST_LAST_EVENT, + SKIP_TO_FIRST, + SKIP_TO_LAST + } + + /** +* The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. +*/ + public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** +* This empty constructor is required for deserializing the configuration. +*/ + public AfterMatchSkipStrategyConfigSnapshot() { + } + + public
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119855#comment-16119855 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132173699 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,59 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy(); + if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST || + afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) { + Patternpattern = currentPattern; + while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { + pattern = pattern.getPrevious(); + } + // pattern name match check. + if (pattern == null) { + throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " + + "can not be found in the given Pattern"); + } else { + // can not be used with optional states. + if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { + throw new MalformedPatternException("the AfterMatchSkipStrategy " + + afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern"); + } + } + + // start position check. + if (pattern.getPrevious() == null) { --- End diff -- Had a second thought on all those cases, and I think you had a good point with one of the previous proposals, just to fall back to the `SKIP_TO_NEXT_EVENT` Maybe let's allow all those situations, but if within a match we cannot discard(e.g. skip is to the first pattern or the optional event is not present) then we discard nothing. The advantage in my opinion it is easier to understand than to understand and maintain all those cases. (As a side note the exceptions in SQL in all those cases in my opinion are just an implementation details as they keep a single partial match at a time). What do you think? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119857#comment-16119857 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132166483 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -157,15 +157,28 @@ */ private boolean nfaChanged; + /** +* Store the skip strategy. +*/ + private AfterMatchSkipStrategy afterMatchSkipStrategy; --- End diff -- I would not add it to NFA. I think it is a static data, that should not be serialized each time with `NFA`. We already have problems to remove `State`s, `Condition`s and `handleTimeout` from serializing within `NFA`. It also makes maintaining serializing compatbility hard. How about adding it to the `NFAFactory` and passing it to `NFA#process`(It is the only place where it is needed). I know it will require passing it in multiple places e.g. `KeyedCEPPatternOperator` and 'TimeoutKeyedCEPPatternOperator` but it will be easier after #4320. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119863#comment-16119863 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132175333 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java --- @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; +import org.apache.flink.cep.pattern.MalformedPatternException; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.List; +import java.util.Map; + +/** + * After match skip tests. + */ +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath; + private String expected; + + private String lateEventPath; + private String expectedLateEvents; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + expected = ""; + + lateEventPath = tempFolder.newFile().toURI().toString(); + expectedLateEvents = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + compareResultsByLinesInMemory(expectedLateEvents, lateEventPath); + } + + private PatternSelectFunctionnewIdSelectFunction(String ... names) { + return new PatternSelectFunction () { + + @Override + public String select(Map pattern) { + StringBuilder builder = new StringBuilder(); + for (String name: names) { + for (Event e : pattern.get(name)) { + builder.append(e.getId()).append(","); + } + } + return builder.toString(); + } + }; + } + + @Test + public void testSkipToNext() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new Event(1, "a", 0.0), + new Event(2, "a", 0.0), + new Event(3, "a", 0.0), + new Event(4, "a", 0.0), + new Event(5, "a", 0.0), + new Event(6, "a", 0.0) + ); + Pattern pattern = Pattern.begin("start", + new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT)) + .where(new SimpleCondition() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(3); + DataStream result = CEP.pattern(input,
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119862#comment-16119862 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132139143 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -322,6 +336,64 @@ public void resetNFAChanged() { } + Set discardEvents = new HashSet<>(); + switch(afterMatchSkipStrategy.getStrategy()) { + case SKIP_TO_LAST: + for (MapresultMap: result) { + boolean matched = false; + for (String key: resultMap.keySet()) { + if (key.equals(afterMatchSkipStrategy.getPatternName())) { + matched = true; + discardEvents.addAll(resultMap.get(key).subList(0, resultMap.get(key).size() - 1)); + } else if (!matched) { + discardEvents.addAll(resultMap.get(key)); + } + } + } + break; + case SKIP_TO_FIRST: + for (Map resultMap: result) { --- End diff -- similar as above > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119859#comment-16119859 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132134129 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.List; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; + + // fields + String patternName = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_EVENT, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) { + if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) { + throw new IllegalArgumentException("the patternName field can not be empty when SkipStrategy is " + strategy); + } + this.strategy = strategy; + this.patternName = patternName; + } + + public SkipStrategy getStrategy() { + return strategy; + } + + public String getPatternName() { + return patternName; + } + + @Override + public String toString() { + return "AfterMatchStrategy{" + + "strategy=" + strategy + + ", patternName=" + patternName + + '}'; + } + + /** +* Skip Strategy Enum. +*/ + public enum SkipStrategy{ + SKIP_TO_NEXT_EVENT, + SKIP_PAST_LAST_EVENT, + SKIP_TO_FIRST, + SKIP_TO_LAST + } + + /** +* The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. +*/ + public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** +* This empty constructor is required for deserializing the configuration. +*/ + public AfterMatchSkipStrategyConfigSnapshot() { + } + + public
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119868#comment-16119868 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132175418 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java --- @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; +import org.apache.flink.cep.pattern.MalformedPatternException; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.List; +import java.util.Map; + +/** + * After match skip tests. + */ +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath; + private String expected; + + private String lateEventPath; + private String expectedLateEvents; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + expected = ""; + + lateEventPath = tempFolder.newFile().toURI().toString(); + expectedLateEvents = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + compareResultsByLinesInMemory(expectedLateEvents, lateEventPath); + } + + private PatternSelectFunctionnewIdSelectFunction(String ... names) { + return new PatternSelectFunction () { + + @Override + public String select(Map pattern) { + StringBuilder builder = new StringBuilder(); + for (String name: names) { + for (Event e : pattern.get(name)) { + builder.append(e.getId()).append(","); + } + } + return builder.toString(); + } + }; + } + + @Test + public void testSkipToNext() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new Event(1, "a", 0.0), + new Event(2, "a", 0.0), + new Event(3, "a", 0.0), + new Event(4, "a", 0.0), + new Event(5, "a", 0.0), + new Event(6, "a", 0.0) + ); + Pattern pattern = Pattern.begin("start", + new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT)) + .where(new SimpleCondition() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(3); + DataStream result = CEP.pattern(input,
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119851#comment-16119851 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132132564 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.List; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; + + // fields + String patternName = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_EVENT, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) { --- End diff -- I would change ctor's into factory methods. I think that way the requirements for `patternName` will be more visible and the the code would be less bloated. I have sth like this in mind: public static AfterMatchSkipStrategy skipToFirst(String patternName) { return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_FIRST, patternName); } public static AfterMatchSkipStrategy skipToLast(String patternName) { return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_LAST, patternName); } public static AfterMatchSkipStrategy skipPastLastEvent() { return new AfterMatchSkipStrategy(SkipStrategy.SKIP_PAST_LAST_EVENT); } public static AfterMatchSkipStrategy skipToNextEvent() { return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_NEXT_EVENT); } Then the usage will be: AfterMatchSkipStrategy.skipToLast("end") instead of: new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "end") > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we >
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119856#comment-16119856 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132169303 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,59 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy(); + if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST || + afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) { + Patternpattern = currentPattern; + while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { + pattern = pattern.getPrevious(); + } + // pattern name match check. + if (pattern == null) { --- End diff -- It is always true. If `pattern.getPrevious` is `null`. The `while` loop will throw `NullPointerException`. Missing test for this case. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119858#comment-16119858 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132166853 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -322,6 +336,64 @@ public void resetNFAChanged() { } + Set discardEvents = new HashSet<>(); --- End diff -- move the whole added block to a new method. The method `process` is already long. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119865#comment-16119865 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132176196 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.List; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; + + // fields + String patternName = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_EVENT, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) { + if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) { + throw new IllegalArgumentException("the patternName field can not be empty when SkipStrategy is " + strategy); + } + this.strategy = strategy; + this.patternName = patternName; + } + + public SkipStrategy getStrategy() { + return strategy; + } + + public String getPatternName() { + return patternName; + } + + @Override + public String toString() { + return "AfterMatchStrategy{" + + "strategy=" + strategy + + ", patternName=" + patternName + + '}'; + } + + /** +* Skip Strategy Enum. +*/ + public enum SkipStrategy{ + SKIP_TO_NEXT_EVENT, + SKIP_PAST_LAST_EVENT, + SKIP_TO_FIRST, + SKIP_TO_LAST + } + + /** +* The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. +*/ + public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** +* This empty constructor is required for deserializing the configuration. +*/ + public AfterMatchSkipStrategyConfigSnapshot() { + } + + public
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119854#comment-16119854 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132168886 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,59 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy(); --- End diff -- Just use `this.afterMatchSkipStrategy` > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119866#comment-16119866 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132175365 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java --- @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; +import org.apache.flink.cep.pattern.MalformedPatternException; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.List; +import java.util.Map; + +/** + * After match skip tests. + */ +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase { + + private String resultPath; + private String expected; + + private String lateEventPath; + private String expectedLateEvents; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + expected = ""; + + lateEventPath = tempFolder.newFile().toURI().toString(); + expectedLateEvents = ""; + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + compareResultsByLinesInMemory(expectedLateEvents, lateEventPath); + } + + private PatternSelectFunctionnewIdSelectFunction(String ... names) { + return new PatternSelectFunction () { + + @Override + public String select(Map pattern) { + StringBuilder builder = new StringBuilder(); + for (String name: names) { + for (Event e : pattern.get(name)) { + builder.append(e.getId()).append(","); + } + } + return builder.toString(); + } + }; + } + + @Test + public void testSkipToNext() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream input = env.fromElements( + new Event(1, "a", 0.0), + new Event(2, "a", 0.0), + new Event(3, "a", 0.0), + new Event(4, "a", 0.0), + new Event(5, "a", 0.0), + new Event(6, "a", 0.0) + ); + Pattern pattern = Pattern.begin("start", + new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT)) + .where(new SimpleCondition() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).times(3); + DataStream result = CEP.pattern(input,
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119852#comment-16119852 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132139074 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -322,6 +336,64 @@ public void resetNFAChanged() { } + Set discardEvents = new HashSet<>(); + switch(afterMatchSkipStrategy.getStrategy()) { + case SKIP_TO_LAST: + for (MapresultMap: result) { + boolean matched = false; --- End diff -- How about: for (Map resultMap: result) { for (Map.Entry keyMatches : resultMap.entrySet()) { if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1)); break; } else { discardEvents.addAll(keyMatches.getValue()); } } } This way we will stop whenever we reach the matching pattern. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118321#comment-16118321 ] Dawid Wysakowicz commented on FLINK-7169: - Hi [~ychen], Sorry for the delay. I will definitely review it this week. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118297#comment-16118297 ] Yueting Chen commented on FLINK-7169: - [~dawidwys] [~dian.fu] I hava updated the PR serveral days ago, could you please help to review that? Thanks. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114077#comment-16114077 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 @dawidwys @dianfu I've updated the approach according to the document. Feel free to comment. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107093#comment-16107093 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 @dianfu Thanks for your reviewing. I found @dawidwys wrote a draft about the JIRA's implementation. I'll go through that first and address those issues in this PR latter. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107090#comment-16107090 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130317270 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { --- End diff -- Thanks for pointing it out. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107091#comment-16107091 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130317411 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { --- End diff -- Because we need to detect whether there is an infinite loop. I use the callLevel to track it here. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107088#comment-16107088 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130317122 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && --- End diff -- Yes, You are right. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107012#comment-16107012 ] Dawid Wysakowicz commented on FLINK-7169: - Hi [~ychen] I've tried to put some of my ideas regarding this issue into a google doc: https://docs.google.com/document/d/1XHgn5FXHukcv9VzWpQeSLhh-6adJ-IH2hZ_CwytJvfo/edit?usp=sharing I think the most important section are the examples where I've put some tricky (at least for me ;) ) cases. Happy to see your comments. Also would be great if we could add some more corner cases. Also [~kkl0u] and [~dian.fu] would be great if you could add some thoughts. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106770#comment-16106770 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130264936 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && --- End diff -- Should use NFAStateNameHandler.getOriginalNameFromInternal() to compare state name. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106772#comment-16106772 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130264164 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { --- End diff -- Should also consider the situation **Proceed to Final state**. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106769#comment-16106769 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130266394 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + CollectioncomputationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); + resultingComputationStates.addAll(computationStates); + } else if (previousState == null && currentState.getName().equals(skipStrategy.getPatternName())) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + break; + case SKIP_TO_LAST: + if (currentState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + Collection computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); + resultingComputationStates.addAll(computationStates); + } + break; + } break; } } - if (computationState.isStartState()) { - int totalBranches = calculateIncreasingSelfState( - outgoingEdges.getTotalIgnoreBranches(), - outgoingEdges.getTotalTakeBranches()); - - DeweyNumber startVersion = computationState.getVersion().increase(totalBranches); - ComputationState startState = ComputationState.createStartState(this, computationState.getState(), startVersion); -
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106771#comment-16106771 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130265354 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { --- End diff -- Why need the callLevel? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106008#comment-16106008 ] Yueting Chen commented on FLINK-7169: - Hi [~dawidwys], About the empty match issue, I am not sure if I get your point, could you show me an example if possible? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104892#comment-16104892 ] ASF GitHub Bot commented on FLINK-7169: --- Github user litrain commented on the issue: https://github.com/apache/flink/pull/4331 @dawidwys @yestinchen Thanks for your discussion, I am also working on the empty match issue now. Please have a look at https://issues.apache.org/jira/browse/FLINK-7292. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104735#comment-16104735 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4331 Hi @yestinchen Thanks for your feedback. Right now, I just want to address the empty match issue. It is not easy to apply that definition to unbounded data. In the SQL specification at the end of partition we can deterministically decide if a partial match is empty or not. It is not the case in unbounded data, as future arriving events may make the partial matches empty. Those are the nuances I think should be addressed first, but I agree there are many similarities. I will try to address rest later during the day, but I think we should continue in the JIRA. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104350#comment-16104350 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4331 Sorry for late response. I think this feature is very useful and agree that we should have a clear thought on what things should be for each skip strategy. I noticed that there are already some discussions in FLINK-3703 which we can refer. I will take a look at this PR and also FLINK-3703 these two days and will post my thought. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104311#comment-16104311 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 @dawidwys Thanks for the reviewing. Problem 1 is easy to fix, we can just start a new match process if the only left computation state reaches stopState. Problem 2 can not be avoided with current approach. It's impossible to know whether there are potential matches. I think the best wary to implement this correctly is try to start a new match process after processing each event, and discard unfinished match process after a successful match according to the skip strategy. In order to do that, we need to keep the logical order of the events, which is the original idea I proposed. As for your general notes, I have some ideas: 1. I agree that the Oracle's specification is designed for bounded data. But match recoginize in unbounded data is very similar to bounded data, since all data are being processed one by one, and there's no need for bound information. As for **_empty match_** , I think we can just use Oracle's definition. > Some patterns permit empty matches. For example: PATTERN (A*) can be matched by zero or more rows that are mapped to A. An empty match does not map any rows to primary row pattern variables; nevertheless, an empty match has a starting row. For example, there can be an empty match at the first row of a row pattern partition, an empty match at the second row of a row pattern partition, etc. An empty match is assigned a sequential match number, based on the ordinal position of its starting row, the same as any other match. 2. I feel uncomfortable with the RuntimeExceptions too. But these exceptions are very important to keep the skip semantics right. I understand your main concern is that Exceptions will stop the matching process, which is unacceptable to online streaming service. To address this, I think we can introduce a default strategy(SKIP_TO_NEXT_EVENT, for example). If these exceptions happens, we can use default strategy to continue the match process, and change the strategy back after a successful match. We can also add a switch to let user decide whether to enable this feature. 3. I still think it's useful to support these skip strategies. Don't know why Esper does not support them. 4. Thanks for the related information. I took a brief look at the PR, which is very similar to this PR. I wonder why it is closed without merging into the master code? Looking forward to your feedbacks. Thanks. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16103166#comment-16103166 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4331 Hi @yestinchen , Thanks for the update. After second round of review. I found many problems with current approach. It returns only the first match in a stream in most cases. 1. Let's analyze a pattern `A B C` with `SKIP_TO_FIRST C` and a sequence `a1 b1 c1 a2 b2 c2`. It will return only `a1 b1 c1` and will left the NFA without any valid `ComputationalStates` which results in stopping processing. 2. Another problem is we do not handle a matches that can potentially finish before previously started. E.g. for Pattern ``` Patternpattern = Pattern.begin("ab").where(new SimpleCondition() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a") || value.getName().equals("b"); } }).followedBy("c").where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return value.getName().equals("c") && ctx.getEventsForPattern("ab").iterator().next().getPrice() == value.getPrice(); } }).setAfterMatchSkipStrategy(new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)); ``` and a sequence `a(price = 1) b(price = 2) c(price = 2)`. I think a desired behaviour would be to start new matching after `c` event, but it won't as the matching started at `a` and will not start at `b`. --- Some general notes: 1. I think the SQL's specification does not suits well into CEP's library as we do not operate on a partition/bounded collection of events. The specification on the other hand assumes such bounded data. I think we would benefit from some additional documentation how the AFTER_MATCH clause works in case of unbounded data. E.g. what does **_empty match_** mean: > Note that the AFTER MATCH SKIP syntax only determines the point to resume scanning for a match after a non-empty match. When an empty match is found, one row is skipped (as if SKIP TO NEXT ROW had been speci ed). Thus an empty match never causes one of these exceptions. etc. 2. I really don't like the idea of so many cases when `RuntimeException` can be thrown. I feel the reason for using CEP is a constantly running jobs that search for patterns in a stream rather than ad-hoc queries. E.g in case of a Pattern like `A B? C` with `SKIP_TO_LAST B` a sequence like `a c` results in an exception and the job being killed. In my opinion it does not suits well into constantly running job. From operational side running such Patterns would be at least interesting ;), as they depend so much on the arriving data. 3. I don't know the reasoning, but Esper, that was mentioned as the other(besides Oracle) library that supports `MATCH_RECOGNIZE` clause does not support `AFTER MATCH` at all. 4. I found out there was already an ongoing work to introduce part of the `AFTER MATCH` (the `SKIP_PAST_LAST`). The corresponding jira: https://issues.apache.org/jira/browse/FLINK-3703 and closed PR: #2367 . To sum up thanks @yestinchen for the work. Unfortunately I think the clause needs a little bit more conceptual discussion before we can introduce this change. I think the `SKIP_PAST_LAST` behaviour would be very helpful (in fact there were alread requests for it in the mailing list) and the most straight forward to implement. I would love to here your opinions @yestinchen as well as @kl0u and @dianfu. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101390#comment-16101390 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129518272 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_ROW: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getRpv()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, outgoingEdges); --- End diff -- Now I keep `startComputationState` instead of `startState` in NFA, so it can calculate the `outgoingEdges` from the start state when needed. Is this right? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101388#comment-16101388 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129517254 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, + * SKIP_TO_FIRST_RPV and + * SKIP_TO_LAST_RPV + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW; + + // fields + String rpv = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_ROW, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) { + if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) { + if (rpv == null) { + throw new IllegalArgumentException("the rpv field can not be empty when SkipStrategy is " + strategy); + } + } + this.strategy = strategy; + this.rpv = rpv; + } + + public SkipStrategy getStrategy() { + return strategy; + } + + public String getRpv() { + return rpv; + } + + @Override + public String toString() { + return "AfterMatchStrategy{" + + "strategy=" + strategy + + ", rpv=" + rpv + + '}'; + } + + /** +* Skip Strategy Enum. +*/ + public enum SkipStrategy{ + SKIP_TO_NEXT_ROW, + SKIP_PAST_LAST_ROW, + SKIP_TO_FIRST, + SKIP_TO_LAST + } + + /** +* The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. +*/ + public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** +* This empty constructor is required for deserializing the configuration. +*/ + public AfterMatchSkipStrategyConfigSnapshot() { + } + + public AfterMatchSkipStrategyConfigSnapshot( + TypeSerializer enumSerializer, + TypeSerializer stringSerializer) { + + super(enumSerializer, stringSerializer); + } + + @Override + public int getVersion() { + return
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101386#comment-16101386 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129517161 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, + * SKIP_TO_FIRST_RPV and + * SKIP_TO_LAST_RPV + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW; + + // fields + String rpv = null; --- End diff -- It means Row Pattern Variable. I already changed it to `patternName`, thought it would be better. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101387#comment-16101387 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129517227 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, + * SKIP_TO_FIRST_RPV and + * SKIP_TO_LAST_RPV + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW; + + // fields + String rpv = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_ROW, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) { + if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) { + if (rpv == null) { --- End diff -- done > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101381#comment-16101381 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129516786 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, --- End diff -- I changed the `ROW` to `EVENT`, is it better ? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101368#comment-16101368 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129516650 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java --- @@ -36,6 +40,14 @@ * @return Resulting pattern stream */ public static PatternStream pattern(DataStream input, Patternpattern) { - return new PatternStream<>(input, pattern); + return new PatternStream<>(input, pattern, skipStrategy); + } + + /** +* Set the pattern's skip strategy after match. +* @param afterMatchSkipStrategy the skip strategy to use. +*/ + public static void setAfterMatchSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy) { --- End diff -- Changed that into `Pattern` > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101358#comment-16101358 ] ASF GitHub Bot commented on FLINK-7169: --- Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129515022 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_ROW: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getRpv()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, outgoingEdges); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + CollectioncomputationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); --- End diff -- Because SKIP_TO_FIRST or SKIP_TO_LAST needs to start the next match process at the first or last matched event in specified pattern. For example, for a given event stream: `a1, b1, c1, a2` and a given match `(A B C)`. If we set the SkipStrategy to SKIP_TO_FIRST with a pattern name `B`, we should create a new `startComputationState` after `b1` is being processed. And the next match should start at event `b1`. So we need to manually feed `b1` to the newly created `startComputationState`. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100039#comment-16100039 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129298052 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, + * SKIP_TO_FIRST_RPV and + * SKIP_TO_LAST_RPV + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW; + + // fields + String rpv = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_ROW, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) { + if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) { + if (rpv == null) { + throw new IllegalArgumentException("the rpv field can not be empty when SkipStrategy is " + strategy); + } + } + this.strategy = strategy; + this.rpv = rpv; + } + + public SkipStrategy getStrategy() { + return strategy; + } + + public String getRpv() { + return rpv; + } + + @Override + public String toString() { + return "AfterMatchStrategy{" + + "strategy=" + strategy + + ", rpv=" + rpv + + '}'; + } + + /** +* Skip Strategy Enum. +*/ + public enum SkipStrategy{ + SKIP_TO_NEXT_ROW, + SKIP_PAST_LAST_ROW, + SKIP_TO_FIRST, + SKIP_TO_LAST + } + + /** +* The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. +*/ + public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** +* This empty constructor is required for deserializing the configuration. +*/ + public AfterMatchSkipStrategyConfigSnapshot() { + } + + public AfterMatchSkipStrategyConfigSnapshot( + TypeSerializer enumSerializer, + TypeSerializer stringSerializer) { + + super(enumSerializer, stringSerializer); + } + + @Override + public int getVersion() { + return VERSION;
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100041#comment-16100041 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129296835 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, + * SKIP_TO_FIRST_RPV and + * SKIP_TO_LAST_RPV + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW; + + // fields + String rpv = null; --- End diff -- What does `rpv` mean? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100040#comment-16100040 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129303933 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_ROW: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getRpv()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, outgoingEdges); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + CollectioncomputationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); --- End diff -- Why do we feed the event? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100038#comment-16100038 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129297040 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, + * SKIP_TO_FIRST_RPV and + * SKIP_TO_LAST_RPV + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW; + + // fields + String rpv = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_ROW, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) { + if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) { + if (rpv == null) { --- End diff -- why not put it into previous if? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100036#comment-16100036 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129303049 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_ROW: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getRpv()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, outgoingEdges); --- End diff -- The `outgoingEdges` parameter in this case will not work. Previously the assumption was that we created the new `start` state always in the starting state. So we could calculate the version for new run based on outgoing edges from the start state. In this case the outgoingEdges do not correspond to the starting state. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100035#comment-16100035 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129296661 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, --- End diff -- There is no notion of ROW in CEP library. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100037#comment-16100037 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129296465 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java --- @@ -36,6 +40,14 @@ * @return Resulting pattern stream */ public static PatternStream pattern(DataStream input, Patternpattern) { - return new PatternStream<>(input, pattern); + return new PatternStream<>(input, pattern, skipStrategy); + } + + /** +* Set the pattern's skip strategy after match. +* @param afterMatchSkipStrategy the skip strategy to use. +*/ + public static void setAfterMatchSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy) { --- End diff -- As @dianfu said in the corresponding JIRA, the `AfterMatchSkipStrategy` should be part of the `Pattern` not `PatternStream`. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern > pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16097349#comment-16097349 ] Yueting Chen commented on FLINK-7169: - Hi [~dawidwys] [~dian.fu] , I opened the pull request, it would be great if you have time to review it. Thanks. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087046#comment-16087046 ] ASF GitHub Bot commented on FLINK-7169: --- GitHub user yestinchen opened a pull request: https://github.com/apache/flink/pull/4331 [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/yestinchen/flink FLINK-7169 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4331.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4331 commit 28f2e0ab2b6fd38864017fc64d2c76a65c8f7574 Author: Yestin <873915...@qq.com> Date: 2017-07-14T08:41:51Z [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16086956#comment-16086956 ] Yueting Chen commented on FLINK-7169: - [~dian.fu] [~dawidwys] Thanks. That's a very good idea. I am implementing this in the way you mentioned. I think it works. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085643#comment-16085643 ] Dawid Wysakowicz commented on FLINK-7169: - I had not enough time to think of all cases ,but also had something like [~dian.fu] said in mind. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085637#comment-16085637 ] Dian Fu commented on FLINK-7169: Hi [~ychen], Thanks a lot working on this ticket. :) For the API change, may be it's better to add an API in {{Pattern}}, such as {{Pattern.setSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy)}}. For the implementation of the {{AfterMatchSkipStrategy}}, I have a very rough though. For example, for pattern {{a b*}}, if the skip strategy is {{AFTER MATCH SKIP TO FIST b}}, we only add a new {{Start}} {{ComputationState}} once the first {{b}} is matched (Not add a new {{Start}} {{ComputationState}} once the {{Start}} {{ComputationState}} is matched which is the current strategy). If this is feasible, we don't need to keep track of the event order any more. Thoughts? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085451#comment-16085451 ] Yueting Chen commented on FLINK-7169: - [~dawidwys] Thanks for the information. I noticed that. But the counter only keeps the reference count of the value, it has some limitations. For example: We need to match a{2} from a1,a2,a3,a4, with the AFTER MATCH SKIP PAST LAST ROW option. With no doubt, the first match is a1,a2. According to the SKIP option, we need to start the second match from a3. But it's possible that all these four events have the same timestamp(1, for example). After the first match, we can only get the timestamp of the final matched event. Without knowing the exact logical order of the four events, it's impossible to skip to the right position. And the counter can't help with that. What I suggest is to keep a logical id for each event, it does not need to be continouse, but it should be monotonically increasing. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085409#comment-16085409 ] Dawid Wysakowicz commented on FLINK-7169: - [~ychen] There is already such variable called counter. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085404#comment-16085404 ] Yueting Chen commented on FLINK-7169: - [~jark] Thanks. I just found out that the events stored in sharedbuffer may have the same timestamp, which makes it impossible to determine which event should be discarded. I think we need to introduce a new variable to keep the logical order of the events. I'll create a seperate JIRA to address this issue. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085290#comment-16085290 ] Jark Wu commented on FLINK-7169: Thanks for your contribution. I moved this issue under FLINK-6935. > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085214#comment-16085214 ] Yueting Chen commented on FLINK-7169: - I am going to work on this, but I would like to hear your thoughts first. [~dian.fu] [~dawidwys][~kkl0u] > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the > first row of the current match. > 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row > after the last row of the current match. > 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row > that is mapped to the row pattern variable RPV. > 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row > that is mapped to the row pattern variable RPV. > I think we can introduce a new function to `CEP` class, which takes a new > parameter as AfterMatchSKipStrategy. > The new API may looks like this > {code} > public static PatternStream pattern(DataStream input, Pattern> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) > {code} > We can also make `SKIP TO NEXT ROW` as the default option, because that's > what CEP library behaves currently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)