[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533693#comment-16533693
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6171


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533455#comment-16533455
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6171
  
Thanks @kl0u for review. I've addressed points 1 and 3. As the second one 
touches some critical parts, let's address it in a separate JIRA.


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524984#comment-16524984
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198474417
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -330,77 +328,85 @@ private boolean isStateTimedOut(final 
ComputationState state, final long timesta
}
}
 
-   discardComputationStatesAccordingToStrategy(
-   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
+   if (!potentialMatches.isEmpty()) {
+   nfaState.setStateChanged();
+   }
+
+   List>> result = new ArrayList<>();
+   if (afterMatchSkipStrategy.isSkipStrategy()) {
+   processMatchesAccordingToSkipStrategy(sharedBuffer,
+   nfaState,
+   afterMatchSkipStrategy,
+   potentialMatches,
+   result);
+   } else {
+   for (ComputationState match : potentialMatches) {
+   
result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(),
--- End diff --

Instead of accessing the state for every match, why not passing all the 
matches to the shared buffer, and try to fetch the common ones only once. If 2 
matches A and B share event with id = 2, we fetch from state only once.


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524986#comment-16524986
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198473426
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.ComputationState;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ */
+public abstract class AfterMatchSkipStrategy implements Serializable {
+
+   private static final long serialVersionUID = -4048930333619068531L;
+
+   /**
+* Discards every partial match that contains event of the match 
preceding the first of *PatternName*.
+*
+* @param patternName the pattern name to skip to
+* @return the created AfterMatchSkipStrategy
+*/
+   public static AfterMatchSkipStrategy skipToFirst(String patternName) {
+   return new SkipToFirstStrategy(patternName);
+   }
+
+   /**
+* Discards every partial match that contains event of the match 
preceding the last of *PatternName*.
+*
+* @param patternName the pattern name to skip to
+* @return the created AfterMatchSkipStrategy
+*/
+   public static AfterMatchSkipStrategy skipToLast(String patternName) {
+   return new SkipToLastStrategy(patternName);
+   }
+
+   /**
+* Discards every partial match that contains event of the match.
+*
+* @return the created AfterMatchSkipStrategy
+*/
+   public static AfterMatchSkipStrategy skipPastLastEvent() {
+   return SkipPastLastStrategy.INSTANCE;
+   }
+
+   /**
+* Every possible match will be emitted.
+*
+* @return the created AfterMatchSkipStrategy
+*/
+   public static AfterMatchSkipStrategy noSkip() {
+   return NoSkipStrategy.INSTANCE;
+   }
+
+   /**
+* Tells if the strategy may skip some matches.
+*
+* @return false if the strategy is NO_SKIP strategy
+*/
+   public abstract boolean isSkipStrategy();
+
+   /**
+* Prunes matches/partial matches based on the chosen strategy.
+*
+* @param partialMatches current partial matches
+* @param matchedResult  already completed matches
+* @param sharedBuffer   corresponding shared buffer
+* @throws Exception thrown if could not access the state
+*/
+   public void prune(
+   Collection partialMatches,
--- End diff --

The name `partialMatches` is misleading because we use it also with the 
`completedMatches`.


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524983#comment-16524983
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198473858
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -330,77 +328,85 @@ private boolean isStateTimedOut(final 
ComputationState state, final long timesta
}
}
 
-   discardComputationStatesAccordingToStrategy(
-   sharedBuffer, computationStates, result, 
afterMatchSkipStrategy);
+   if (!potentialMatches.isEmpty()) {
+   nfaState.setStateChanged();
+   }
+
+   List>> result = new ArrayList<>();
+   if (afterMatchSkipStrategy.isSkipStrategy()) {
+   processMatchesAccordingToSkipStrategy(sharedBuffer,
+   nfaState,
+   afterMatchSkipStrategy,
+   potentialMatches,
+   result);
+   } else {
+   for (ComputationState match : potentialMatches) {
+   
result.add(sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(match.getPreviousBufferEntry(),
+   match.getVersion()).get(0)));
+   
sharedBuffer.releaseNode(match.getPreviousBufferEntry());
+   }
+   }
 
return result;
}
 
-   private void discardComputationStatesAccordingToStrategy(
-   final SharedBuffer sharedBuffer,
-   final Queue computationStates,
-   final Collection>> matchedResult,
-   final AfterMatchSkipStrategy afterMatchSkipStrategy) 
throws Exception {
+   private void processMatchesAccordingToSkipStrategy(
+   SharedBuffer sharedBuffer,
+   NFAState nfaState,
+   AfterMatchSkipStrategy afterMatchSkipStrategy,
+   PriorityQueue potentialMatches,
+   List>> result) throws Exception {
 
-   Set discardEvents = new HashSet<>();
-   switch(afterMatchSkipStrategy.getStrategy()) {
-   case SKIP_TO_LAST:
-   for (Map> resultMap: 
matchedResult) {
-   for (Map.Entry> 
keyMatches : resultMap.entrySet()) {
-   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
-   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
-   break;
-   } else {
-   
discardEvents.addAll(keyMatches.getValue());
-   }
-   }
-   }
-   break;
-   case SKIP_TO_FIRST:
-   for (Map> resultMap: 
matchedResult) {
-   for (Map.Entry> 
keyMatches : resultMap.entrySet()) {
-   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
-   break;
-   } else {
-   
discardEvents.addAll(keyMatches.getValue());
-   }
-   }
-   }
-   break;
-   case SKIP_PAST_LAST_EVENT:
-   for (Map> resultMap: 
matchedResult) {
-   for (List eventList: 
resultMap.values()) {
-   discardEvents.addAll(eventList);
-   }
-   }
-   break;
-   }
-   if (!discardEvents.isEmpty()) {
-   List discardStates = new 
ArrayList<>();
-   for (ComputationState computationState : 
computationStates) {
-   boolean discard = false;
-   Map> partialMatch = 
extractCurrentMatches(sharedBuffer, computationState);
-   for (List list: 

[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524982#comment-16524982
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198472975
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java 
---
@@ -79,18 +98,18 @@ public boolean equals(Object o) {
return false;
}
NFAState nfaState = (NFAState) o;
-   return  Objects.equals(computationStates, 
nfaState.computationStates);
+   return Objects.equals(partialMatches, nfaState.partialMatches);
}
 
@Override
public int hashCode() {
-   return Objects.hash(computationStates, stateChanged);
+   return Objects.hash(partialMatches, stateChanged);
--- End diff --

Same as above.


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-06-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524985#comment-16524985
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/6171#discussion_r198472927
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java 
---
@@ -79,18 +98,18 @@ public boolean equals(Object o) {
return false;
}
NFAState nfaState = (NFAState) o;
-   return  Objects.equals(computationStates, 
nfaState.computationStates);
+   return Objects.equals(partialMatches, nfaState.partialMatches);
--- End diff --

What about the `completedMatches`?


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-06-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16514611#comment-16514611
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6171
  
Hi, @dawidwys can you explain a little about how does the semantics of 
`AfterMatch` differ from previous implementation, I read the doc and feel a 
lille confused. thx ;-)


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-06-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513493#comment-16513493
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6171

[FLINK-9593] Unified After Match semantics with SQL MATCH_RECOGNIZE

## What is the purpose of the change

Unify semantics of AfteMatch skip with SQL standard to enable CEP ans SQL 
integration.

## Brief change log

- partial/completed matches are pruned based on which one happened first.

## Verifying this change

*(Please pick either of the following options)*


This change added tests:

- testSkipPastLastWithOneOrMoreAtBeginning
- testSkipBeforeOtherAlreadyCompleted

and adjusted all other tests in class `AfterMatchSkipITCase.java`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (**yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink cep-after-first-match

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6171.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6171


commit aca1b71de9b342840043983c8e3eabecb5f0afd4
Author: Dawid Wysakowicz 
Date:   2018-06-14T15:10:05Z

[FLINK-9593] Unified After Match semantics with SQL MATCH_RECOGNIZE




> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-06-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513494#comment-16513494
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6171
  
Would appreciate if you had a look @kl0u 


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)