[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16150436#comment-16150436
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4331


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147903#comment-16147903
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
@dawidwys Thanks for the reviews and comments, please change the 
documentation during merge.
And @kl0u , thanks for the reviews!


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147900#comment-16147900
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r136169035
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1250,6 +1250,104 @@ pattern.within(Time.seconds(10))
 
 
 
+### After Match Skip Strategy
+
+For a given pattern, there can be many successful matches as data stream 
flows. In order to control how to restart the match process after a successful 
match, we need to specify the skip strategy called `AfterMatchSkipStrategy`. 
There're four types of skip strategies, listed as follows:
--- End diff --

That sounds good to me, please change it during merge. Thanks a lot.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146868#comment-16146868
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4331
  
I think you can merge it @dawidwys . 
I was following the evolution of this PR and I think it looks good ;) . 
Thanks for the work both @yestinchen and @dawidwys !


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146860#comment-16146860
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r135465794
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
@@ -511,4 +530,5 @@ object Pattern {
 */
   def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F] =
--- End diff --

Missing version with `AfterMatchSkipStrategy`. I will change it during 
merge, if you are ok with it.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146861#comment-16146861
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r135465365
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1250,6 +1250,104 @@ pattern.within(Time.seconds(10))
 
 
 
+### After Match Skip Strategy
+
+For a given pattern, there can be many successful matches as data stream 
flows. In order to control how to restart the match process after a successful 
match, we need to specify the skip strategy called `AfterMatchSkipStrategy`. 
There're four types of skip strategies, listed as follows:
--- End diff --

That is not true. We do not restart the match process at all. It just 
controls which results are discarded. I will change it during merge, if you are 
ok with it.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145363#comment-16145363
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
Hi @dawidwys , sorry for the late response. 
Thanks for your reviews, I have updated the test and the document. Please 
take a look if you have time. Thanks.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134871#comment-16134871
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r134170320
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -707,7 +789,7 @@ private boolean 
checkFilterCondition(ComputationState computationState, Itera
result.put(key, values);
}
 
-   for (T event: events) {
+   for (T event : events) {
--- End diff --

Unrelated change


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134870#comment-16134870
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r134170202
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -695,10 +778,9 @@ private boolean 
checkFilterCondition(ComputationState computationState, Itera
}
// for a given computation state, we cannot have more than one 
matching patterns.
Preconditions.checkState(paths.size() == 1);
-
-   Map result = new HashMap<>();
+   Map result = new LinkedHashMap<>();
Map path = paths.get(0);
-   for (String key: path.keySet()) {
+   for (String key : path.keySet()) {
--- End diff --

Unrelated change


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134869#comment-16134869
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r134170147
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -990,7 +1072,7 @@ public void serialize(NFA record, DataOutputView 
target) throws IOException {
boolean handleTimeout = source.readBoolean();
 
NFA nfa = new NFA<>(eventSerializer, windowTime, 
handleTimeout);
-   nfa.states = states;
+   nfa.addStates(states);
--- End diff --

This change can result in multiple start states being added. Anyway this is 
unrelated change


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134873#comment-16134873
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4331
  
Still missing some tests:

- ensuring the `NFA#extractCurrentMatches` returns patterns in order.
- skip to first/last with `oneOrMore`

Docs missing


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134872#comment-16134872
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r134170217
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -557,8 +640,8 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
 
if (computationState.isStartState()) {
int totalBranches = calculateIncreasingSelfState(
-   outgoingEdges.getTotalIgnoreBranches(),
-   outgoingEdges.getTotalTakeBranches());
+   outgoingEdges.getTotalIgnoreBranches(),
--- End diff --

Unrelated change


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16130168#comment-16130168
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133667042
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
--- End diff --

The changes in the next commit are not enough. The `Map` type should be 
also changed in `SharedBuffer#extractPatterns`. Also please provide tests for 
this behaviour (returning results in Pattern order), so that it will not be 
possible to change it by mistake.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128921#comment-16128921
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133478539
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
--- End diff --

You are absolutely right. Thanks for the tip.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128920#comment-16128920
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133478273
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,29 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
--- End diff --

We only need to check the skip strategy before compile the `Pattern` to 
`NFA`, I think it's more reasonable to place it here. Also, we need to check 
whether the `patternName` field in the `AfterMatchSkipStrategy` is a valid 
reference, which can not be done easily in `Pattern` class.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16126650#comment-16126650
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133091608
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
+   Collection> matchedResult, 
AfterMatchSkipStrategy afterMatchSkipStrategy) {
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_TO_FIRST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_PAST_LAST_EVENT:
+   for (Map resultMap: 
matchedResult) {
+   for (List eventList: 
resultMap.values()) {
+   discardEvents.addAll(eventList);
+   }
+   }
+   break;
+   }
+   if (!discardEvents.isEmpty()) {
+   List discardStates = new 
ArrayList<>();
+   for (ComputationState computationState : 
computationStates) {
+   Map partialMatch = 
extractCurrentMatches(computationState);
+   for (List list: partialMatch.values()) {
+   for (T e: list) {
+   if (discardEvents.contains(e)) {
+   // discard the 
computation state.
+   
eventSharedBuffer.release(
+   
NFAStateNameHandler.getOriginalNameFromInternal(
+   
computationState.getState().getName()),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   
computationState.getCounter()
+   );
+   
discardStates.add(computationState);
--- End diff --

Yes, you are right. Thanks for pointing it out!


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125389#comment-16125389
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132895649
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN.
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
+
+   // pattern name to skip to
+   String patternName = null;
+
--- End diff --

This can be `private`.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125388#comment-16125388
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132896369
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
 ---
@@ -42,20 +43,21 @@
private static final long serialVersionUID = 3570542177814518158L;
 
public TimeoutKeyedCEPPatternOperator(
-   TypeSerializer inputSerializer,
-   boolean isProcessingTime,
-   TypeSerializer keySerializer,
-   NFACompiler.NFAFactory nfaFactory,
-   boolean migratingFromOldKeyedOperator,
-   EventComparator comparator) {
+   TypeSerializer inputSerializer,
--- End diff --

Please revert the unrelated formatting changes.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125387#comment-16125387
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132894559
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -96,7 +97,8 @@
} else {
final NFAFactoryCompiler nfaFactoryCompiler = new 
NFAFactoryCompiler<>(pattern);
nfaFactoryCompiler.compileFactory();
-   return new NFAFactoryImpl<>(inputTypeSerializer, 
nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), 
timeoutHandling);
+   return new NFAFactoryImpl<>(inputTypeSerializer, 
nfaFactoryCompiler.getWindowTime(),
+   nfaFactoryCompiler.getStates(), 
timeoutHandling);
}
--- End diff --

Unrelated formatting change. Please revert.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125386#comment-16125386
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132896299
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
 ---
@@ -42,28 +43,29 @@
private static final long serialVersionUID = 5328573789532074581L;
 
public KeyedCEPPatternOperator(
-   TypeSerializer inputSerializer,
-   boolean isProcessingTime,
-   TypeSerializer keySerializer,
-   NFACompiler.NFAFactory nfaFactory,
-   boolean migratingFromOldKeyedOperator,
-   EventComparator comparator) {
+   TypeSerializer inputSerializer,
--- End diff --

Please revert the unrelated formatting changes.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125384#comment-16125384
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132895639
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN.
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
--- End diff --

This can be `private`.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125385#comment-16125385
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132896150
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -120,13 +121,16 @@
 
private final EventComparator comparator;
 
+   final AfterMatchSkipStrategy afterMatchSkipStrategy;
+
public AbstractKeyedCEPPatternOperator(
-   final TypeSerializer inputSerializer,
--- End diff --

Please revert the formatting changes (tabs).


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125378#comment-16125378
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132895143
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,29 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   if (afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
+   afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+   Pattern pattern = currentPattern;
+   while 
(!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+   if (pattern.getPrevious() == null) {
+   break;
+   } else {
+   pattern = pattern.getPrevious();
+   }
+   }
+
--- End diff --

The above can become:
```
while (pattern.getPrevious() != null && 
!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
pattern = pattern.getPrevious();
}
```
right?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125383#comment-16125383
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132898814
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
--- End diff --

This method is very fragile as it makes the assumption that the different 
patterns appear in the match in the order that they were declared in the 
pattern declaration, e.g. for a pattern `A -> B -> C` it assumes that the 
partial matches will appear in the order [`matches for A`, `matches for B`, 
`matches for C`]. This is *NOT* correct as we use a `HashMap` which does not 
provide any ordering guarantees. 

A quick fix would be to use a `LinkedHashMap` as the return type for the 
`extractCurrentMatches` and put the results there in the correct order.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125381#comment-16125381
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132894611
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -117,9 +119,11 @@
private Map, Boolean> firstOfLoopMap = new 
HashMap<>();
private Pattern currentPattern;
private Pattern followingPattern;
+   private AfterMatchSkipStrategy afterMatchSkipStrategy;
--- End diff --

This can be `final`.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125390#comment-16125390
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132895822
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN.
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
+
+   // pattern name to skip to
+   String patternName = null;
+
+   /**
+* Skip to first *pattern*.
+* @param patternName the pattern name to skip to
+* @return
--- End diff --

The `@return` javadoc is missing on all of the methods. Also add a brief 
description of what each one implies.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125377#comment-16125377
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132895590
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN.
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
--- End diff --

Please add a `serialVersionUID`, e.g.: 
`private static final long serialVersionUID = -3601462998929198774L;`


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125379#comment-16125379
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132895280
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,29 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
--- End diff --

Could this whole check go in the `Pattern` class? This will make the code 
clearer.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125382#comment-16125382
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132895977
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN.
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
+
+   // pattern name to skip to
+   String patternName = null;
+
+   /**
+* Skip to first *pattern*.
+* @param patternName the pattern name to skip to
+* @return
+*/
+   public static AfterMatchSkipStrategy skipToFirst(String patternName) {
+   return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_FIRST, 
patternName);
+   }
+
+   /**
+* Skip to last *pattern*.
+* @param patternName the pattern name to skip to
+* @return
+*/
+   public static AfterMatchSkipStrategy skipToLast(String patternName) {
+   return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_LAST, 
patternName);
+   }
+
+   /**
+* Skip past last event.
+* @return
+*/
+   public static AfterMatchSkipStrategy skipPastLastEvent() {
+   return new 
AfterMatchSkipStrategy(SkipStrategy.SKIP_PAST_LAST_EVENT);
+   }
+
+   /**
+* Skip to next event.
+* @return
+*/
+   public static AfterMatchSkipStrategy skipToNextEvent() {
+   return new 
AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_NEXT_EVENT);
+   }
+
+   private AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   private AfterMatchSkipStrategy(SkipStrategy strategy, String 
patternName) {
+   if (patternName == null && (strategy == 
SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
+   throw new IllegalArgumentException("The patternName 
field can not be empty when SkipStrategy is " + strategy);
+   }
+   this.strategy = strategy;
+   this.patternName = patternName;
+   }
+
+   public SkipStrategy getStrategy() {
+   return strategy;
+   }
+
+   public String getPatternName() {
+   return patternName;
+   }
+
+   @Override
+   public String toString() {
--- End diff --

If `patternName == null` then there is nothing to print, so it would be 
nice to adjust the `toString()` accordingly.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125392#comment-16125392
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132896183
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -120,13 +121,16 @@
 
private final EventComparator comparator;
 
+   final AfterMatchSkipStrategy afterMatchSkipStrategy;
--- End diff --

This can be `protected`.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125380#comment-16125380
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132895583
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
--- End diff --

You could also add a brief description in the class javadoc for each of the 
strategies.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125391#comment-16125391
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132897107
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -158,9 +158,9 @@
private boolean nfaChanged;
 
public NFA(
-   final TypeSerializer eventSerializer,
-   final long windowTime,
-   final boolean handleTimeout) {
+   final TypeSerializer eventSerializer,
--- End diff --

Please revert the unrelated formatting change.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16125189#comment-16125189
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132867483
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue 
computationStates,
+   Collection> matchedResult, 
AfterMatchSkipStrategy afterMatchSkipStrategy) {
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_TO_FIRST:
+   for (Map resultMap: 
matchedResult) {
+   for (Map.Entry 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_PAST_LAST_EVENT:
+   for (Map resultMap: 
matchedResult) {
+   for (List eventList: 
resultMap.values()) {
+   discardEvents.addAll(eventList);
+   }
+   }
+   break;
+   }
+   if (!discardEvents.isEmpty()) {
+   List discardStates = new 
ArrayList<>();
+   for (ComputationState computationState : 
computationStates) {
+   Map partialMatch = 
extractCurrentMatches(computationState);
+   for (List list: partialMatch.values()) {
+   for (T e: list) {
+   if (discardEvents.contains(e)) {
+   // discard the 
computation state.
+   
eventSharedBuffer.release(
+   
NFAStateNameHandler.getOriginalNameFromInternal(
+   
computationState.getState().getName()),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   
computationState.getCounter()
+   );
+   
discardStates.add(computationState);
--- End diff --

Should add **break;** after **discardStates.add(computationState);**, right?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124622#comment-16124622
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
Thanks for your reviews @dawidwys ! I'll update the doc in the following 
commits.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16123249#comment-16123249
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132669361
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -859,7 +940,7 @@ public boolean apply(@Nullable State input) {
 */
public static final class NFASerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
--- End diff --

Revert the changes in serializer.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121220#comment-16121220
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132386948
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,59 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   AfterMatchSkipStrategy afterMatchSkipStrategy = 
currentPattern.getAfterMatchSkipStrategy();
+   if (afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
+   afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+   Pattern pattern = currentPattern;
+   while 
(!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+   pattern = pattern.getPrevious();
+   }
+   // pattern name match check.
+   if (pattern == null) {
+   throw new 
MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy 
" +
+   "can not be found in the given 
Pattern");
+   } else {
+   // can not be used with optional states.
+   if 
(pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+   throw new 
MalformedPatternException("the AfterMatchSkipStrategy "
+   + 
afterMatchSkipStrategy.getStrategy() + " can not be used with optional 
pattern");
+   }
+   }
+
+   // start position check.
+   if (pattern.getPrevious() == null) {
--- End diff --

Great, I'll just remove all those optional state check.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16121203#comment-16121203
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132384194
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,59 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   AfterMatchSkipStrategy afterMatchSkipStrategy = 
currentPattern.getAfterMatchSkipStrategy();
+   if (afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
+   afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+   Pattern pattern = currentPattern;
+   while 
(!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+   pattern = pattern.getPrevious();
+   }
+   // pattern name match check.
+   if (pattern == null) {
+   throw new 
MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy 
" +
+   "can not be found in the given 
Pattern");
+   } else {
+   // can not be used with optional states.
+   if 
(pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+   throw new 
MalformedPatternException("the AfterMatchSkipStrategy "
+   + 
afterMatchSkipStrategy.getStrategy() + " can not be used with optional 
pattern");
+   }
+   }
+
+   // start position check.
+   if (pattern.getPrevious() == null) {
--- End diff --

I personally see no reason for a semantic with RuntimeException. I can't 
think of any use-case for it. Maybe let's finish this PR without the switch and 
exceptions and open a JIRA with the switch, ideally with some use-case's for 
that semantic, so we can further agree on that and see if anyone needs it.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120938#comment-16120938
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132349108
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
+
+   // fields
+   String patternName = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String 
patternName) {
--- End diff --

Good idea, I like that.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120936#comment-16120936
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132348950
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,59 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   AfterMatchSkipStrategy afterMatchSkipStrategy = 
currentPattern.getAfterMatchSkipStrategy();
+   if (afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
+   afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+   Pattern pattern = currentPattern;
+   while 
(!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+   pattern = pattern.getPrevious();
+   }
+   // pattern name match check.
+   if (pattern == null) {
+   throw new 
MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy 
" +
+   "can not be found in the given 
Pattern");
+   } else {
+   // can not be used with optional states.
+   if 
(pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+   throw new 
MalformedPatternException("the AfterMatchSkipStrategy "
+   + 
afterMatchSkipStrategy.getStrategy() + " can not be used with optional 
pattern");
+   }
+   }
+
+   // start position check.
+   if (pattern.getPrevious() == null) {
--- End diff --

I agree with you that the fallback approach is much easier to understand 
and maintain.
If we discard nothing, the actual sematics is to use SKIP_TO_NEXT_EVENT for 
the next match process. But it will have an impact on matching sematics, which 
may lead to incorrect results. I think users should be aware of what happens. 
My original thought was to add a configuration switch, to let user choose 
between throwing exceptions and falling back to a default skip strategy.
Do you have any ideas about that?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119867#comment-16119867
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132174353
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java
 ---
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * After match skip tests.
+ */
+public class AfterMatchSkipITCase extends 
StreamingMultipleProgramsTestBase {
--- End diff --

Switch the test to be similar to `NFAITCase` to test smaller part of the 
library.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119869#comment-16119869
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132175179
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java
 ---
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * After match skip tests.
+ */
+public class AfterMatchSkipITCase extends 
StreamingMultipleProgramsTestBase {
+
+   private String resultPath;
+   private String expected;
+
+   private String lateEventPath;
+   private String expectedLateEvents;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   expected = "";
+
+   lateEventPath = tempFolder.newFile().toURI().toString();
+   expectedLateEvents = "";
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   compareResultsByLinesInMemory(expectedLateEvents, 
lateEventPath);
+   }
+
+   private PatternSelectFunction newIdSelectFunction(String 
... names) {
+   return new PatternSelectFunction() {
+
+   @Override
+   public String select(Map pattern) {
+   StringBuilder builder = new StringBuilder();
+   for (String name: names) {
+   for (Event e : pattern.get(name)) {
+   
builder.append(e.getId()).append(",");
+   }
+   }
+   return builder.toString();
+   }
+   };
+   }
+
+   @Test
+   public void testSkipToNext() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new Event(1, "a", 0.0),
+   new Event(2, "a", 0.0),
+   new Event(3, "a", 0.0),
+   new Event(4, "a", 0.0),
+   new Event(5, "a", 0.0),
+   new Event(6, "a", 0.0)
+   );
+   Pattern pattern = Pattern.begin("start",
+   new 
AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
+   .where(new SimpleCondition() {
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).times(3);
+   DataStream result = CEP.pattern(input, 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119853#comment-16119853
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132173777
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,59 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   AfterMatchSkipStrategy afterMatchSkipStrategy = 
currentPattern.getAfterMatchSkipStrategy();
+   if (afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
+   afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+   Pattern pattern = currentPattern;
+   while 
(!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+   pattern = pattern.getPrevious();
+   }
+   // pattern name match check.
+   if (pattern == null) {
+   throw new 
MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy 
" +
+   "can not be found in the given 
Pattern");
+   } else {
+   // can not be used with optional states.
+   if 
(pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
--- End diff --

We could also allow this. See comment below.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119860#comment-16119860
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132175300
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java
 ---
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * After match skip tests.
+ */
+public class AfterMatchSkipITCase extends 
StreamingMultipleProgramsTestBase {
+
+   private String resultPath;
+   private String expected;
+
+   private String lateEventPath;
+   private String expectedLateEvents;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   expected = "";
+
+   lateEventPath = tempFolder.newFile().toURI().toString();
+   expectedLateEvents = "";
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   compareResultsByLinesInMemory(expectedLateEvents, 
lateEventPath);
+   }
+
+   private PatternSelectFunction newIdSelectFunction(String 
... names) {
+   return new PatternSelectFunction() {
+
+   @Override
+   public String select(Map pattern) {
+   StringBuilder builder = new StringBuilder();
+   for (String name: names) {
+   for (Event e : pattern.get(name)) {
+   
builder.append(e.getId()).append(",");
+   }
+   }
+   return builder.toString();
+   }
+   };
+   }
+
+   @Test
+   public void testSkipToNext() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new Event(1, "a", 0.0),
+   new Event(2, "a", 0.0),
+   new Event(3, "a", 0.0),
+   new Event(4, "a", 0.0),
+   new Event(5, "a", 0.0),
+   new Event(6, "a", 0.0)
+   );
+   Pattern pattern = Pattern.begin("start",
+   new 
AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
+   .where(new SimpleCondition() {
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).times(3);
+   DataStream result = CEP.pattern(input, 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119861#comment-16119861
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132137487
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -157,15 +157,28 @@
 */
private boolean nfaChanged;
 
+   /**
+* Store the skip strategy.
+*/
+   private AfterMatchSkipStrategy afterMatchSkipStrategy;
+
+   public NFA(
--- End diff --

Don't see a point for creating another ctor.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119864#comment-16119864
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132136797
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
+
+   // fields
+   String patternName = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String 
patternName) {
+   if (patternName == null && (strategy == 
SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
+   throw new IllegalArgumentException("the patternName 
field can not be empty when SkipStrategy is " + strategy);
+   }
+   this.strategy = strategy;
+   this.patternName = patternName;
+   }
+
+   public SkipStrategy getStrategy() {
+   return strategy;
+   }
+
+   public String getPatternName() {
+   return patternName;
+   }
+
+   @Override
+   public String toString() {
+   return "AfterMatchStrategy{" +
+   "strategy=" + strategy +
+   ", patternName=" + patternName +
+   '}';
+   }
+
+   /**
+* Skip Strategy Enum.
+*/
+   public enum SkipStrategy{
+   SKIP_TO_NEXT_EVENT,
+   SKIP_PAST_LAST_EVENT,
+   SKIP_TO_FIRST,
+   SKIP_TO_LAST
+   }
+
+   /**
+* The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+*/
+   public static class AfterMatchSkipStrategyConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+   private static final int VERSION = 1;
+
+   /**
+* This empty constructor is required for deserializing the 
configuration.
+*/
+   public AfterMatchSkipStrategyConfigSnapshot() {
+   }
+
+   public 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119855#comment-16119855
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132173699
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,59 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   AfterMatchSkipStrategy afterMatchSkipStrategy = 
currentPattern.getAfterMatchSkipStrategy();
+   if (afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
+   afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+   Pattern pattern = currentPattern;
+   while 
(!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+   pattern = pattern.getPrevious();
+   }
+   // pattern name match check.
+   if (pattern == null) {
+   throw new 
MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy 
" +
+   "can not be found in the given 
Pattern");
+   } else {
+   // can not be used with optional states.
+   if 
(pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+   throw new 
MalformedPatternException("the AfterMatchSkipStrategy "
+   + 
afterMatchSkipStrategy.getStrategy() + " can not be used with optional 
pattern");
+   }
+   }
+
+   // start position check.
+   if (pattern.getPrevious() == null) {
--- End diff --

Had a second thought on all those cases, and I think you had a good point 
with one of the previous proposals, just to fall back to the 
`SKIP_TO_NEXT_EVENT`

Maybe let's allow all those situations, but if within a match we cannot 
discard(e.g. skip is to the first pattern or the optional event is not present) 
then we discard nothing. The advantage in my opinion it is easier to understand 
than to understand and maintain all those cases. (As a side note the exceptions 
in SQL in all those cases in my opinion are just an implementation details as 
they keep a single partial match at a time). What do you think?  


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119857#comment-16119857
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132166483
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -157,15 +157,28 @@
 */
private boolean nfaChanged;
 
+   /**
+* Store the skip strategy.
+*/
+   private AfterMatchSkipStrategy afterMatchSkipStrategy;
--- End diff --

I would not add it to NFA. I think it is a static data, that should not be 
serialized each time with `NFA`. We already have problems to remove `State`s, 
`Condition`s and `handleTimeout` from serializing within `NFA`. It also makes 
maintaining serializing compatbility hard. How about adding it to the 
`NFAFactory` and passing it to `NFA#process`(It is the only place where it is 
needed).

I know it will require passing it in multiple places e.g. 
`KeyedCEPPatternOperator` and  'TimeoutKeyedCEPPatternOperator` but it will be 
easier after #4320.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119863#comment-16119863
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132175333
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java
 ---
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * After match skip tests.
+ */
+public class AfterMatchSkipITCase extends 
StreamingMultipleProgramsTestBase {
+
+   private String resultPath;
+   private String expected;
+
+   private String lateEventPath;
+   private String expectedLateEvents;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   expected = "";
+
+   lateEventPath = tempFolder.newFile().toURI().toString();
+   expectedLateEvents = "";
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   compareResultsByLinesInMemory(expectedLateEvents, 
lateEventPath);
+   }
+
+   private PatternSelectFunction newIdSelectFunction(String 
... names) {
+   return new PatternSelectFunction() {
+
+   @Override
+   public String select(Map pattern) {
+   StringBuilder builder = new StringBuilder();
+   for (String name: names) {
+   for (Event e : pattern.get(name)) {
+   
builder.append(e.getId()).append(",");
+   }
+   }
+   return builder.toString();
+   }
+   };
+   }
+
+   @Test
+   public void testSkipToNext() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new Event(1, "a", 0.0),
+   new Event(2, "a", 0.0),
+   new Event(3, "a", 0.0),
+   new Event(4, "a", 0.0),
+   new Event(5, "a", 0.0),
+   new Event(6, "a", 0.0)
+   );
+   Pattern pattern = Pattern.begin("start",
+   new 
AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
+   .where(new SimpleCondition() {
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).times(3);
+   DataStream result = CEP.pattern(input, 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119862#comment-16119862
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132139143
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -322,6 +336,64 @@ public void resetNFAChanged() {
 
}
 
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map resultMap: result) {
+   boolean matched = false;
+   for (String key: resultMap.keySet()) {
+   if 
(key.equals(afterMatchSkipStrategy.getPatternName())) {
+   matched = true;
+   
discardEvents.addAll(resultMap.get(key).subList(0, resultMap.get(key).size() - 
1));
+   } else if (!matched) {
+   
discardEvents.addAll(resultMap.get(key));
+   }
+   }
+   }
+   break;
+   case SKIP_TO_FIRST:
+   for (Map resultMap: result) {
--- End diff --

similar as above


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119859#comment-16119859
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132134129
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
+
+   // fields
+   String patternName = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String 
patternName) {
+   if (patternName == null && (strategy == 
SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
+   throw new IllegalArgumentException("the patternName 
field can not be empty when SkipStrategy is " + strategy);
+   }
+   this.strategy = strategy;
+   this.patternName = patternName;
+   }
+
+   public SkipStrategy getStrategy() {
+   return strategy;
+   }
+
+   public String getPatternName() {
+   return patternName;
+   }
+
+   @Override
+   public String toString() {
+   return "AfterMatchStrategy{" +
+   "strategy=" + strategy +
+   ", patternName=" + patternName +
+   '}';
+   }
+
+   /**
+* Skip Strategy Enum.
+*/
+   public enum SkipStrategy{
+   SKIP_TO_NEXT_EVENT,
+   SKIP_PAST_LAST_EVENT,
+   SKIP_TO_FIRST,
+   SKIP_TO_LAST
+   }
+
+   /**
+* The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+*/
+   public static class AfterMatchSkipStrategyConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+   private static final int VERSION = 1;
+
+   /**
+* This empty constructor is required for deserializing the 
configuration.
+*/
+   public AfterMatchSkipStrategyConfigSnapshot() {
+   }
+
+   public 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119868#comment-16119868
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132175418
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java
 ---
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * After match skip tests.
+ */
+public class AfterMatchSkipITCase extends 
StreamingMultipleProgramsTestBase {
+
+   private String resultPath;
+   private String expected;
+
+   private String lateEventPath;
+   private String expectedLateEvents;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   expected = "";
+
+   lateEventPath = tempFolder.newFile().toURI().toString();
+   expectedLateEvents = "";
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   compareResultsByLinesInMemory(expectedLateEvents, 
lateEventPath);
+   }
+
+   private PatternSelectFunction newIdSelectFunction(String 
... names) {
+   return new PatternSelectFunction() {
+
+   @Override
+   public String select(Map pattern) {
+   StringBuilder builder = new StringBuilder();
+   for (String name: names) {
+   for (Event e : pattern.get(name)) {
+   
builder.append(e.getId()).append(",");
+   }
+   }
+   return builder.toString();
+   }
+   };
+   }
+
+   @Test
+   public void testSkipToNext() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new Event(1, "a", 0.0),
+   new Event(2, "a", 0.0),
+   new Event(3, "a", 0.0),
+   new Event(4, "a", 0.0),
+   new Event(5, "a", 0.0),
+   new Event(6, "a", 0.0)
+   );
+   Pattern pattern = Pattern.begin("start",
+   new 
AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
+   .where(new SimpleCondition() {
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).times(3);
+   DataStream result = CEP.pattern(input, 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119851#comment-16119851
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132132564
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
+
+   // fields
+   String patternName = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String 
patternName) {
--- End diff --

I would change ctor's into factory methods. I think that way the 
requirements for `patternName` will be more visible and the the code would be 
less bloated.

I have sth like this in mind:

public static AfterMatchSkipStrategy skipToFirst(String patternName) {
return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_FIRST, 
patternName);
}

public static AfterMatchSkipStrategy skipToLast(String patternName) {
return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_LAST, 
patternName);
}

public static AfterMatchSkipStrategy skipPastLastEvent() {
return new 
AfterMatchSkipStrategy(SkipStrategy.SKIP_PAST_LAST_EVENT);
}

public static AfterMatchSkipStrategy skipToNextEvent() {
return new 
AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_NEXT_EVENT);
}

Then the usage will be:

AfterMatchSkipStrategy.skipToLast("end")

instead of:

new 
AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "end")



> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119856#comment-16119856
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132169303
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,59 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   AfterMatchSkipStrategy afterMatchSkipStrategy = 
currentPattern.getAfterMatchSkipStrategy();
+   if (afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
+   afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+   Pattern pattern = currentPattern;
+   while 
(!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+   pattern = pattern.getPrevious();
+   }
+   // pattern name match check.
+   if (pattern == null) {
--- End diff --

It is always true. If `pattern.getPrevious` is `null`. The `while` loop 
will throw `NullPointerException`.

Missing test for this case.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119858#comment-16119858
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132166853
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -322,6 +336,64 @@ public void resetNFAChanged() {
 
}
 
+   Set discardEvents = new HashSet<>();
--- End diff --

move the whole added block to a new method. The method `process` is already 
long.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119865#comment-16119865
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132176196
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
+
+   // fields
+   String patternName = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String 
patternName) {
+   if (patternName == null && (strategy == 
SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
+   throw new IllegalArgumentException("the patternName 
field can not be empty when SkipStrategy is " + strategy);
+   }
+   this.strategy = strategy;
+   this.patternName = patternName;
+   }
+
+   public SkipStrategy getStrategy() {
+   return strategy;
+   }
+
+   public String getPatternName() {
+   return patternName;
+   }
+
+   @Override
+   public String toString() {
+   return "AfterMatchStrategy{" +
+   "strategy=" + strategy +
+   ", patternName=" + patternName +
+   '}';
+   }
+
+   /**
+* Skip Strategy Enum.
+*/
+   public enum SkipStrategy{
+   SKIP_TO_NEXT_EVENT,
+   SKIP_PAST_LAST_EVENT,
+   SKIP_TO_FIRST,
+   SKIP_TO_LAST
+   }
+
+   /**
+* The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+*/
+   public static class AfterMatchSkipStrategyConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+   private static final int VERSION = 1;
+
+   /**
+* This empty constructor is required for deserializing the 
configuration.
+*/
+   public AfterMatchSkipStrategyConfigSnapshot() {
+   }
+
+   public 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119854#comment-16119854
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132168886
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,59 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   AfterMatchSkipStrategy afterMatchSkipStrategy = 
currentPattern.getAfterMatchSkipStrategy();
--- End diff --

Just use `this.afterMatchSkipStrategy`


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119866#comment-16119866
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132175365
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java
 ---
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * After match skip tests.
+ */
+public class AfterMatchSkipITCase extends 
StreamingMultipleProgramsTestBase {
+
+   private String resultPath;
+   private String expected;
+
+   private String lateEventPath;
+   private String expectedLateEvents;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   expected = "";
+
+   lateEventPath = tempFolder.newFile().toURI().toString();
+   expectedLateEvents = "";
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   compareResultsByLinesInMemory(expectedLateEvents, 
lateEventPath);
+   }
+
+   private PatternSelectFunction newIdSelectFunction(String 
... names) {
+   return new PatternSelectFunction() {
+
+   @Override
+   public String select(Map pattern) {
+   StringBuilder builder = new StringBuilder();
+   for (String name: names) {
+   for (Event e : pattern.get(name)) {
+   
builder.append(e.getId()).append(",");
+   }
+   }
+   return builder.toString();
+   }
+   };
+   }
+
+   @Test
+   public void testSkipToNext() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStream input = env.fromElements(
+   new Event(1, "a", 0.0),
+   new Event(2, "a", 0.0),
+   new Event(3, "a", 0.0),
+   new Event(4, "a", 0.0),
+   new Event(5, "a", 0.0),
+   new Event(6, "a", 0.0)
+   );
+   Pattern pattern = Pattern.begin("start",
+   new 
AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
+   .where(new SimpleCondition() {
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).times(3);
+   DataStream result = CEP.pattern(input, 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119852#comment-16119852
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132139074
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -322,6 +336,64 @@ public void resetNFAChanged() {
 
}
 
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map resultMap: result) {
+   boolean matched = false;
--- End diff --

How about:

for (Map resultMap: result) {
for (Map.Entry keyMatches : resultMap.entrySet()) {
if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
break;
} else {
discardEvents.addAll(keyMatches.getValue());
}
}
}

This way we will stop whenever we reach the matching pattern.



> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-08 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118321#comment-16118321
 ] 

Dawid Wysakowicz commented on FLINK-7169:
-

Hi [~ychen], Sorry for the delay. I will definitely review it this week. 

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-08 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118297#comment-16118297
 ] 

Yueting Chen commented on FLINK-7169:
-

[~dawidwys] [~dian.fu]
I hava updated the PR serveral days ago, could you please help to review that? 
Thanks.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114077#comment-16114077
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
@dawidwys @dianfu I've updated the approach according to the document. Feel 
free to comment.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107093#comment-16107093
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
@dianfu Thanks for your reviewing. 
I found @dawidwys wrote a draft about the JIRA's implementation. I'll go 
through that first and address those issues in this PR latter. 


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107090#comment-16107090
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130317270
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
--- End diff --

Thanks for pointing it out.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107091#comment-16107091
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130317411
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
--- End diff --

Because we need to detect whether there is an infinite loop. I use the 
callLevel to track it here.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107088#comment-16107088
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130317122
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
--- End diff --

Yes, You are right.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-31 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107012#comment-16107012
 ] 

Dawid Wysakowicz commented on FLINK-7169:
-

Hi [~ychen]

I've tried to put some of my ideas regarding this issue into a google doc: 
https://docs.google.com/document/d/1XHgn5FXHukcv9VzWpQeSLhh-6adJ-IH2hZ_CwytJvfo/edit?usp=sharing
I think the most important section are the examples where I've put some tricky 
(at least for me ;) ) cases. Happy to see your comments. Also would be great if 
we could add some more corner cases.

Also [~kkl0u] and [~dian.fu] would be great if you could add some thoughts.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106770#comment-16106770
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130264936
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
--- End diff --

Should use NFAStateNameHandler.getOriginalNameFromInternal() to compare 
state name.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106772#comment-16106772
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130264164
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
--- End diff --

Should also consider the situation **Proceed to Final state**.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106769#comment-16106769
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130266394
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
+   
resultingComputationStates.addAll(computationStates);
+   } else if 
(previousState == null && 
currentState.getName().equals(skipStrategy.getPatternName())) {
+   throw new 
RuntimeException("infinite loop! Will abort the match process, please rewrite 
your pattern query");
+   }
+   break;
+   case SKIP_TO_LAST:
+   if 
(currentState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
+   
resultingComputationStates.addAll(computationStates);
+   }
+   break;
+   }
break;
}
}
 
-   if (computationState.isStartState()) {
-   int totalBranches = calculateIncreasingSelfState(
-   outgoingEdges.getTotalIgnoreBranches(),
-   outgoingEdges.getTotalTakeBranches());
-
-   DeweyNumber startVersion = 
computationState.getVersion().increase(totalBranches);
-   ComputationState startState = 
ComputationState.createStartState(this, computationState.getState(), 
startVersion);
-   

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106771#comment-16106771
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130265354
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
--- End diff --

Why need the callLevel?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-28 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16106008#comment-16106008
 ] 

Yueting Chen commented on FLINK-7169:
-

Hi [~dawidwys], 
About the empty match issue, I am not sure if I get your point, could you show 
me an example if possible?

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104892#comment-16104892
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user litrain commented on the issue:

https://github.com/apache/flink/pull/4331
  
@dawidwys  @yestinchen Thanks for your discussion, I am also working on the 
empty match issue now.  Please have a look at 
https://issues.apache.org/jira/browse/FLINK-7292. 


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
> Fix For: 1.4.0
>
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104735#comment-16104735
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4331
  
Hi @yestinchen 

Thanks for your feedback. 

Right now, I just want to address the empty match issue. It is not easy to 
apply that definition to unbounded data. In the SQL specification at the end of 
partition we can deterministically decide if a partial match is empty or not. 
It is not the case in unbounded data, as future arriving events may make the 
partial matches empty. 

Those are the nuances I think should be addressed first, but I agree there 
are many similarities.

I will try to address rest later during the day, but I think we should 
continue in the JIRA.



> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104350#comment-16104350
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4331
  
Sorry for late response. I think this feature is very useful and agree that 
we should have a clear thought on what things should be for each skip strategy. 
I noticed that there are already some discussions in FLINK-3703 which we can 
refer. I will take a look at this PR and also FLINK-3703 these two days and 
will post my thought.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16104311#comment-16104311
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
@dawidwys Thanks for the reviewing. 

Problem 1 is easy to fix, we can just start a new match process if the only 
left computation state reaches stopState.
Problem 2 can not be avoided with current approach.  It's impossible to 
know whether there are potential matches. 

I think the best wary to implement this correctly is try to start a new 
match process after processing each event, and discard unfinished match process 
after a successful match according to the skip strategy. In order to do that, 
we need to keep the logical order of the events, which is the original idea I 
proposed. 

As for your general notes, I have some ideas:

1. I agree that the Oracle's specification is designed for bounded data. 
But match recoginize in unbounded data is very similar to bounded data, since 
all data are being processed one by one, and there's no need for bound 
information. As for **_empty match_** , I think we can just use Oracle's 
definition.
> Some patterns permit empty matches. For example:
PATTERN (A*)
can be matched by zero or more rows that are mapped to A.
An empty match does not map any rows to primary row pattern variables; 
nevertheless, an empty match has a starting row. For example, there can be an 
empty match at the first row of a row pattern partition, an empty match at the 
second row of a row pattern partition, etc. An empty match is assigned a 
sequential match number, based on the ordinal position of its starting row, the 
same as any other match.

2. I feel uncomfortable with the RuntimeExceptions too. But these 
exceptions are very important to keep the skip semantics right. I understand 
your main concern is that Exceptions will stop the matching process, which is 
unacceptable to online streaming service. To address this, I think we can 
introduce a default strategy(SKIP_TO_NEXT_EVENT, for example). If these 
exceptions happens, we can use default strategy to continue the match process, 
and change the strategy back after a successful match. We can also add a switch 
to let user decide whether to enable this feature.

3. I still think it's useful to support these skip strategies. Don't know 
why Esper does not support them.

4. Thanks for the related information. I took a brief look at the PR, which 
is very similar to this PR. I  wonder why it is closed without merging into the 
master code?

Looking forward to your feedbacks. Thanks.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16103166#comment-16103166
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4331
  
Hi @yestinchen ,
Thanks for the update.

After second round of review. I found many problems with current approach. 
It returns only the first match in a stream in most cases.

1. Let's analyze a pattern `A B C` with `SKIP_TO_FIRST C` and a sequence 
`a1 b1 c1 a2 b2 c2`. It will return only `a1 b1 c1` and will left the NFA 
without any valid `ComputationalStates` which results in stopping processing.

2. Another problem is we do not handle a matches that can potentially 
finish before previously started. E.g. for Pattern 

```
Pattern pattern = Pattern.begin("ab").where(new 
SimpleCondition() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a") || 
value.getName().equals("b");
}
}).followedBy("c").where(new IterativeCondition() {
@Override
public boolean filter(Event value, Context ctx) throws Exception 
{
return value.getName().equals("c") && 
ctx.getEventsForPattern("ab").iterator().next().getPrice() == value.getPrice();
}
}).setAfterMatchSkipStrategy(new 
AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT));
```

and a sequence `a(price = 1) b(price = 2) c(price = 2)`. I think a desired 
behaviour would be to start new matching after `c` event, but it won't as the 
matching started at `a` and will not start at `b`.

---

Some general notes:

1. I think the SQL's specification does not suits well into CEP's library 
as we do not operate on a partition/bounded collection of events. The 
specification on the other hand assumes such bounded data. I think we would 
benefit from some additional documentation how the AFTER_MATCH clause works in 
case of unbounded data. E.g. what does **_empty match_** mean:

> Note that the AFTER MATCH SKIP syntax only determines the point to resume 
scanning for a match after a non-empty match. When an empty match is found, one 
row is skipped (as if SKIP TO NEXT ROW had been speci ed). Thus an empty match 
never causes one of these exceptions.

etc.

2. I really don't like the idea of so many cases when `RuntimeException` 
can be thrown. I feel the reason for using CEP is a constantly running jobs 
that search for patterns in a stream rather than ad-hoc queries. 
E.g in case of a Pattern like `A B? C` with `SKIP_TO_LAST B` a sequence 
like `a c` results in an exception and the job being killed. In my opinion it 
does not suits well into constantly running job. From operational side running 
such Patterns would be at least interesting ;), as they depend so much on the 
arriving data.

3. I don't know the reasoning, but Esper, that was mentioned as the 
other(besides Oracle) library that supports `MATCH_RECOGNIZE` clause does not 
support `AFTER MATCH` at all.

4. I found out there was already an ongoing work to introduce part of the 
`AFTER MATCH` (the `SKIP_PAST_LAST`). The corresponding jira: 
https://issues.apache.org/jira/browse/FLINK-3703 and closed PR: #2367 .

To sum up thanks @yestinchen for the work. Unfortunately I think the clause 
needs a little bit more conceptual discussion before we can introduce this 
change. I think the `SKIP_PAST_LAST` behaviour would be very helpful (in fact 
there were alread requests for it in the mailing list) and the most straight 
forward to implement. I would love to here your opinions @yestinchen as well as 
@kl0u and @dianfu.




> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101390#comment-16101390
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129518272
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_ROW:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
outgoingEdges));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getRpv()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, outgoingEdges);
--- End diff --

Now I keep `startComputationState` instead of `startState` in NFA, so it 
can calculate the `outgoingEdges` from the start state when needed. Is this 
right? 


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101388#comment-16101388
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129517254
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
+ * SKIP_TO_FIRST_RPV and
+ * SKIP_TO_LAST_RPV
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
+
+   // fields
+   String rpv = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
+   if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == 
SkipStrategy.SKIP_TO_LAST) {
+   if (rpv == null) {
+   throw new IllegalArgumentException("the rpv 
field can not be empty when SkipStrategy is " + strategy);
+   }
+   }
+   this.strategy = strategy;
+   this.rpv = rpv;
+   }
+
+   public SkipStrategy getStrategy() {
+   return strategy;
+   }
+
+   public String getRpv() {
+   return rpv;
+   }
+
+   @Override
+   public String toString() {
+   return "AfterMatchStrategy{" +
+   "strategy=" + strategy +
+   ", rpv=" + rpv +
+   '}';
+   }
+
+   /**
+* Skip Strategy Enum.
+*/
+   public enum SkipStrategy{
+   SKIP_TO_NEXT_ROW,
+   SKIP_PAST_LAST_ROW,
+   SKIP_TO_FIRST,
+   SKIP_TO_LAST
+   }
+
+   /**
+* The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+*/
+   public static class AfterMatchSkipStrategyConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+   private static final int VERSION = 1;
+
+   /**
+* This empty constructor is required for deserializing the 
configuration.
+*/
+   public AfterMatchSkipStrategyConfigSnapshot() {
+   }
+
+   public AfterMatchSkipStrategyConfigSnapshot(
+   TypeSerializer enumSerializer,
+   TypeSerializer stringSerializer) {
+
+   super(enumSerializer, stringSerializer);
+   }
+
+   @Override
+   public int getVersion() {
+   return 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101386#comment-16101386
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129517161
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
+ * SKIP_TO_FIRST_RPV and
+ * SKIP_TO_LAST_RPV
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
+
+   // fields
+   String rpv = null;
--- End diff --

It means Row Pattern Variable. I already changed it to `patternName`, 
thought it would be better.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101387#comment-16101387
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129517227
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
+ * SKIP_TO_FIRST_RPV and
+ * SKIP_TO_LAST_RPV
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
+
+   // fields
+   String rpv = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
+   if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == 
SkipStrategy.SKIP_TO_LAST) {
+   if (rpv == null) {
--- End diff --

done


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101381#comment-16101381
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129516786
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
--- End diff --

I changed the `ROW` to `EVENT`, is it better ?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101368#comment-16101368
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129516650
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java ---
@@ -36,6 +40,14 @@
 * @return Resulting pattern stream
 */
public static  PatternStream pattern(DataStream input, 
Pattern pattern) {
-   return new PatternStream<>(input, pattern);
+   return new PatternStream<>(input, pattern, skipStrategy);
+   }
+
+   /**
+* Set the pattern's skip strategy after match.
+* @param afterMatchSkipStrategy the skip strategy to use.
+*/
+   public static void setAfterMatchSkipStrategy(AfterMatchSkipStrategy 
afterMatchSkipStrategy) {
--- End diff --

Changed that into `Pattern`


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16101358#comment-16101358
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129515022
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_ROW:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
outgoingEdges));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getRpv()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, outgoingEdges);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
--- End diff --

Because SKIP_TO_FIRST or SKIP_TO_LAST needs to start the next match process 
at the first or last matched event in specified pattern. For example, for a 
given event stream: `a1, b1, c1, a2` and a given match `(A B C)`. If we set the 
SkipStrategy to SKIP_TO_FIRST with a pattern name `B`, we should create a new 
`startComputationState` after `b1` is being processed. And the next match 
should start at event `b1`. So we need to manually feed `b1` to the newly 
created `startComputationState`. 


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100039#comment-16100039
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129298052
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
+ * SKIP_TO_FIRST_RPV and
+ * SKIP_TO_LAST_RPV
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
+
+   // fields
+   String rpv = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
+   if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == 
SkipStrategy.SKIP_TO_LAST) {
+   if (rpv == null) {
+   throw new IllegalArgumentException("the rpv 
field can not be empty when SkipStrategy is " + strategy);
+   }
+   }
+   this.strategy = strategy;
+   this.rpv = rpv;
+   }
+
+   public SkipStrategy getStrategy() {
+   return strategy;
+   }
+
+   public String getRpv() {
+   return rpv;
+   }
+
+   @Override
+   public String toString() {
+   return "AfterMatchStrategy{" +
+   "strategy=" + strategy +
+   ", rpv=" + rpv +
+   '}';
+   }
+
+   /**
+* Skip Strategy Enum.
+*/
+   public enum SkipStrategy{
+   SKIP_TO_NEXT_ROW,
+   SKIP_PAST_LAST_ROW,
+   SKIP_TO_FIRST,
+   SKIP_TO_LAST
+   }
+
+   /**
+* The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+*/
+   public static class AfterMatchSkipStrategyConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+   private static final int VERSION = 1;
+
+   /**
+* This empty constructor is required for deserializing the 
configuration.
+*/
+   public AfterMatchSkipStrategyConfigSnapshot() {
+   }
+
+   public AfterMatchSkipStrategyConfigSnapshot(
+   TypeSerializer enumSerializer,
+   TypeSerializer stringSerializer) {
+
+   super(enumSerializer, stringSerializer);
+   }
+
+   @Override
+   public int getVersion() {
+   return VERSION;
 

[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100041#comment-16100041
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129296835
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
+ * SKIP_TO_FIRST_RPV and
+ * SKIP_TO_LAST_RPV
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
+
+   // fields
+   String rpv = null;
--- End diff --

What does `rpv` mean?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100040#comment-16100040
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129303933
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_ROW:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
outgoingEdges));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getRpv()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, outgoingEdges);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
--- End diff --

Why do we feed the event?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100038#comment-16100038
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129297040
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
+ * SKIP_TO_FIRST_RPV and
+ * SKIP_TO_LAST_RPV
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
+
+   // fields
+   String rpv = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
+   if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == 
SkipStrategy.SKIP_TO_LAST) {
+   if (rpv == null) {
--- End diff --

why not put it into previous if?


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100036#comment-16100036
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129303049
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_ROW:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
outgoingEdges));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getRpv()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, outgoingEdges);
--- End diff --

The `outgoingEdges` parameter in this case will not work. Previously the 
assumption was that we created the new `start` state always in the starting 
state. So we could calculate the version for new run based on outgoing edges 
from the start state. 

In this case the outgoingEdges do not correspond to the starting state.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100035#comment-16100035
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129296661
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
--- End diff --

There is no notion of ROW in CEP library.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16100037#comment-16100037
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129296465
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java ---
@@ -36,6 +40,14 @@
 * @return Resulting pattern stream
 */
public static  PatternStream pattern(DataStream input, 
Pattern pattern) {
-   return new PatternStream<>(input, pattern);
+   return new PatternStream<>(input, pattern, skipStrategy);
+   }
+
+   /**
+* Set the pattern's skip strategy after match.
+* @param afterMatchSkipStrategy the skip strategy to use.
+*/
+   public static void setAfterMatchSkipStrategy(AfterMatchSkipStrategy 
afterMatchSkipStrategy) {
--- End diff --

As @dianfu said in the corresponding JIRA, the `AfterMatchSkipStrategy` 
should be part of the `Pattern` not `PatternStream`.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-22 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16097349#comment-16097349
 ] 

Yueting Chen commented on FLINK-7169:
-

Hi [~dawidwys] [~dian.fu] ,
I opened the pull request, it would be great if you have time to review it. 
Thanks.


> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087046#comment-16087046
 ] 

ASF GitHub Bot commented on FLINK-7169:
---

GitHub user yestinchen opened a pull request:

https://github.com/apache/flink/pull/4331

[FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yestinchen/flink FLINK-7169

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4331.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4331


commit 28f2e0ab2b6fd38864017fc64d2c76a65c8f7574
Author: Yestin <873915...@qq.com>
Date:   2017-07-14T08:41:51Z

[FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP




> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-14 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16086956#comment-16086956
 ] 

Yueting Chen commented on FLINK-7169:
-

[~dian.fu] [~dawidwys] 
Thanks. That's a very good idea. I am implementing this in the way you 
mentioned. I think it works.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085643#comment-16085643
 ] 

Dawid Wysakowicz commented on FLINK-7169:
-

I had not enough time to think of all cases ,but also had something like 
[~dian.fu] said in mind. 

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085637#comment-16085637
 ] 

Dian Fu commented on FLINK-7169:


Hi [~ychen],
Thanks a lot working on this ticket. :)

For the API change, may be it's better to add an API in {{Pattern}}, such as 
{{Pattern.setSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy)}}.

For the implementation of the {{AfterMatchSkipStrategy}}, I have a very rough 
though. For example, for pattern {{a b*}}, if the skip strategy is {{AFTER 
MATCH SKIP TO FIST b}}, we only add a new {{Start}} {{ComputationState}} once 
the first {{b}} is matched (Not add a new {{Start}} {{ComputationState}} once 
the {{Start}} {{ComputationState}} is matched which is the current strategy). 
If this is feasible, we don't need to keep track of the event order any more. 
Thoughts?

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085451#comment-16085451
 ] 

Yueting Chen commented on FLINK-7169:
-

[~dawidwys]
Thanks for the information.
I noticed that. But the counter only keeps the reference count of the value, it 
has some limitations. For example:
We need to match a{2} from a1,a2,a3,a4, with the AFTER MATCH SKIP PAST LAST ROW 
option.
With no doubt, the first match is a1,a2. According to the SKIP option, we need 
to start the second match from a3.
But it's possible that all these four events have the same timestamp(1, for 
example). After the first match, we can only get the timestamp of the final 
matched event. Without knowing the exact logical order of the four events, it's 
impossible to skip to the right position. And the counter can't help with that.
What I suggest is to keep a logical id for each event, it does not need to be 
continouse, but it should be monotonically increasing.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085409#comment-16085409
 ] 

Dawid Wysakowicz commented on FLINK-7169:
-

[~ychen] There is already such variable called counter.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085404#comment-16085404
 ] 

Yueting Chen commented on FLINK-7169:
-

[~jark] Thanks.

I just found out that the events stored in sharedbuffer may have the same 
timestamp, which makes it impossible to determine which event should be 
discarded. I think we need to introduce a new variable to keep the logical 
order of the events.

I'll create a seperate JIRA to address this issue.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-13 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085290#comment-16085290
 ] 

Jark Wu commented on FLINK-7169:


Thanks for your contribution. I moved this issue under FLINK-6935.

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API

2017-07-12 Thread Yueting Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085214#comment-16085214
 ] 

Yueting Chen commented on FLINK-7169:
-

I am going to work on this, but I would like to hear your thoughts first. 
[~dian.fu] [~dawidwys][~kkl0u]

> Support AFTER MATCH SKIP function in CEP library API
> 
>
> Key: FLINK-7169
> URL: https://issues.apache.org/jira/browse/FLINK-7169
> Project: Flink
>  Issue Type: Task
>  Components: CEP
>Reporter: Yueting Chen
>Assignee: Yueting Chen
>
> In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we 
> need to support AFTER MATCH SKIP function in CEP API.
> There're four options in AFTER MATCH SKIP, listed as follows:
> 1. AFTER MATCH SKIP TO NEXT ROW: resume pattern matching at the row after the 
> first row of the current match.
> 2. AFTER MATCH SKIP PAST LAST ROW: resume pattern matching at the next row 
> after the last row of the current match.
> 3. AFTER MATCH SKIP TO FIST *RPV*: resume pattern matching at the first row 
> that is mapped to the row pattern variable RPV.
> 4. AFTER MATCH SKIP TO LAST *RPV*: resume pattern matching at the last row 
> that is mapped to the row pattern variable RPV.
> I think we can introduce a new function to `CEP` class, which takes a new 
> parameter as AfterMatchSKipStrategy.
> The new API may looks like this
> {code}
> public static  PatternStream pattern(DataStream input, Pattern 
> pattern, AfterMatchSkipStrategy afterMatchSkipStrategy) 
> {code}
> We can also make `SKIP TO NEXT ROW` as the default option, because that's 
> what CEP library behaves currently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >