[GitHub] flink issue #4632: [FLINK-7563] [cep] Fix watermark semantics in cep and rel...

2017-09-15 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4632
  
I added the test case to the `CEP Operator` and addressed the line. Thanks 
so much.


---


[GitHub] flink issue #4673: [hotfix] [cep] Fix afterMatchStrategy parameter missing i...

2017-09-15 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4673
  
Hi @dawidwys , Thanks for the tip, that's what I tried to add. Sorry for 
the missing times().


---


[GitHub] flink issue #4673: [hotfix] [cep] Fix afterMatchStrategy parameter missing i...

2017-09-14 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4673
  
Hi @dawidwys ,
I just found this issue in after match feature, which causes 
AfterMatchSkipStrategy not work. This may be introduced while merging file 
conflicts. I addressed that in this PR and added a simple test to exam such 
problems. Please merge this to the repository. Thanks.


---


[GitHub] flink pull request #4673: [hotfix] [cep] Fix afterMatchStrategy parameter mi...

2017-09-14 Thread yestinchen
GitHub user yestinchen opened a pull request:

https://github.com/apache/flink/pull/4673

[hotfix] [cep] Fix afterMatchStrategy parameter missing issue



## What is the purpose of the change
Fix afterMatchSkipStrategy parameter missing when calling `nfa.process()` 
function. This issue may be introduced during merging conflicts.

## Brief change log


## Verifying this change

This change added tests and can be verified as follows:
append a aftermatch test case in `CEPITCase` to exam this kind of problems.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yestinchen/flink aftermatch-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4673.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4673


commit 060dcde4bd9662c98ade061c1698fcff5bb702b3
Author: Yestin <873915...@qq.com>
Date:   2017-09-14T16:01:53Z

[hotfix] [cep] Fix afterMatchStrategy parameter missing issue

This issue may be introduced during merging conflicts.




---


[GitHub] flink pull request #4632: [FLINK-7563] [cep] Fix watermark semantics in cep ...

2017-08-31 Thread yestinchen
GitHub user yestinchen opened a pull request:

https://github.com/apache/flink/pull/4632

[FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

## What is the purpose of the change

Correct the watermark semantics in cep.

## Brief change log

-  use the logic that (timestamp <= watermark) is considered late.

## Verifying this change

*(Please pick either of the following options)*

This change is already covered by existing tests, such as 
`testCEPOperatorCleanupEventTime()`, and 
`testCEPOperatorSerializationWRocksDB()` in `CEPOperatorTest.java`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yestinchen/flink FLINK-7563

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4632.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4632


commit 419921c81be20b0c00dfeaba287f603af2600fad
Author: Yestin <873915...@qq.com>
Date:   2017-08-31T21:09:30Z

[FLINK-7563] [cep] Fix watermark semantics in cep and related tests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

2017-08-30 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
@dawidwys Thanks for the reviews and comments, please change the 
documentation during merge.
And @kl0u , thanks for the reviews!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-30 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r136169035
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1250,6 +1250,104 @@ pattern.within(Time.seconds(10))
 
 
 
+### After Match Skip Strategy
+
+For a given pattern, there can be many successful matches as data stream 
flows. In order to control how to restart the match process after a successful 
match, we need to specify the skip strategy called `AfterMatchSkipStrategy`. 
There're four types of skip strategies, listed as follows:
--- End diff --

That sounds good to me, please change it during merge. Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

2017-08-29 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
Hi @dawidwys , sorry for the late response. 
Thanks for your reviews, I have updated the test and the document. Please 
take a look if you have time. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-16 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133478539
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue<ComputationState> 
computationStates,
--- End diff --

You are absolutely right. Thanks for the tip.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-16 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133478273
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,29 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
--- End diff --

We only need to check the skip strategy before compile the `Pattern` to 
`NFA`, I think it's more reasonable to place it here. Also, we need to check 
whether the `patternName` field in the `AfterMatchSkipStrategy` is a valid 
reference, which can not be done easily in `Pattern` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-14 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r133091608
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -340,6 +362,65 @@ public void resetNFAChanged() {
return Tuple2.of(result, timeoutResult);
}
 
+   private void 
discardComputationStatesAccordingToStrategy(Queue<ComputationState> 
computationStates,
+   Collection<Map<String, List>> matchedResult, 
AfterMatchSkipStrategy afterMatchSkipStrategy) {
+   Set discardEvents = new HashSet<>();
+   switch(afterMatchSkipStrategy.getStrategy()) {
+   case SKIP_TO_LAST:
+   for (Map<String, List> resultMap: 
matchedResult) {
+   for (Map.Entry<String, List> 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   
discardEvents.addAll(keyMatches.getValue().subList(0, 
keyMatches.getValue().size() - 1));
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_TO_FIRST:
+   for (Map<String, List> resultMap: 
matchedResult) {
+   for (Map.Entry<String, List> 
keyMatches : resultMap.entrySet()) {
+   if 
(keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
+   break;
+   } else {
+   
discardEvents.addAll(keyMatches.getValue());
+   }
+   }
+   }
+   break;
+   case SKIP_PAST_LAST_EVENT:
+   for (Map<String, List> resultMap: 
matchedResult) {
+   for (List eventList: 
resultMap.values()) {
+   discardEvents.addAll(eventList);
+   }
+   }
+   break;
+   }
+   if (!discardEvents.isEmpty()) {
+   List<ComputationState> discardStates = new 
ArrayList<>();
+   for (ComputationState computationState : 
computationStates) {
+   Map<String, List> partialMatch = 
extractCurrentMatches(computationState);
+   for (List list: partialMatch.values()) {
+   for (T e: list) {
+   if (discardEvents.contains(e)) {
+   // discard the 
computation state.
+   
eventSharedBuffer.release(
+   
NFAStateNameHandler.getOriginalNameFromInternal(
+   
computationState.getState().getName()),
+   
computationState.getEvent(),
+   
computationState.getTimestamp(),
+   
computationState.getCounter()
+   );
+   
discardStates.add(computationState);
--- End diff --

Yes, you are right. Thanks for pointing it out!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

2017-08-12 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
Thanks for your reviews @dawidwys ! I'll update the doc in the following 
commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-10 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132386948
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,59 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   AfterMatchSkipStrategy afterMatchSkipStrategy = 
currentPattern.getAfterMatchSkipStrategy();
+   if (afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
+   afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+   Pattern<T, ?> pattern = currentPattern;
+   while 
(!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+   pattern = pattern.getPrevious();
+   }
+   // pattern name match check.
+   if (pattern == null) {
+   throw new 
MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy 
" +
+   "can not be found in the given 
Pattern");
+   } else {
+   // can not be used with optional states.
+   if 
(pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+   throw new 
MalformedPatternException("the AfterMatchSkipStrategy "
+   + 
afterMatchSkipStrategy.getStrategy() + " can not be used with optional 
pattern");
+   }
+   }
+
+   // start position check.
+   if (pattern.getPrevious() == null) {
--- End diff --

Great, I'll just remove all those optional state check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-09 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132349108
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_EVENT,
+ * SKIP_TO_NEXT_EVENT,
+ * SKIP_TO_FIRST_PATTERN and
+ * SKIP_TO_LAST_PATTERN
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
+
+   // fields
+   String patternName = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String 
patternName) {
--- End diff --

Good idea, I like that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-08-09 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r132348950
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -150,6 +160,59 @@ long getWindowTime() {
}
 
/**
+* Check pattern after match skip strategy.
+*/
+   private void checkPatternSkipStrategy() {
+   AfterMatchSkipStrategy afterMatchSkipStrategy = 
currentPattern.getAfterMatchSkipStrategy();
+   if (afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
+   afterMatchSkipStrategy.getStrategy() == 
AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+   Pattern<T, ?> pattern = currentPattern;
+   while 
(!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+   pattern = pattern.getPrevious();
+   }
+   // pattern name match check.
+   if (pattern == null) {
+   throw new 
MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy 
" +
+   "can not be found in the given 
Pattern");
+   } else {
+   // can not be used with optional states.
+   if 
(pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+   throw new 
MalformedPatternException("the AfterMatchSkipStrategy "
+   + 
afterMatchSkipStrategy.getStrategy() + " can not be used with optional 
pattern");
+   }
+   }
+
+   // start position check.
+   if (pattern.getPrevious() == null) {
--- End diff --

I agree with you that the fallback approach is much easier to understand 
and maintain.
If we discard nothing, the actual sematics is to use SKIP_TO_NEXT_EVENT for 
the next match process. But it will have an impact on matching sematics, which 
may lead to incorrect results. I think users should be aware of what happens. 
My original thought was to add a configuration switch, to let user choose 
between throwing exceptions and falling back to a default skip strategy.
Do you have any ideas about that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4479: [FLINK-7309][hotfix] fix NullPointerException when...

2017-08-04 Thread yestinchen
GitHub user yestinchen opened a pull request:

https://github.com/apache/flink/pull/4479

[FLINK-7309][hotfix] fix NullPointerException when selecting null fields

## What is the purpose of the change

This pull request addresses FLINK-7309, adds null check before applying 
unboxing on input fields.

## Brief change log

- Add null check before applying unboxing on input fields.

## Verifying this change

This change added tests and can be verified as follows:
 - Added test case: select null field from a Timestamp type field.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yestinchen/flink FLINK-7309

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4479.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4479


commit 7cbbd13b723df11e72ccb115b5266104b0b01183
Author: Yestin <873915...@qq.com>
Date:   2017-08-04T09:21:03Z

[FLINK-7309][hotfix] fix NullPointerException when selecting null fields.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

2017-08-04 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
@dawidwys @dianfu I've updated the approach according to the document. Feel 
free to comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

2017-07-31 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
@dianfu Thanks for your reviewing. 
I found @dawidwys wrote a draft about the JIRA's implementation. I'll go 
through that first and address those issues in this PR latter. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-31 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130317411
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, event);
+   if (callLevel > 
0) {
--- End diff --

Because we need to detect whether there is an infinite loop. I use the 
callLevel to track it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-31 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130317270
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
--- End diff --

Thanks for pointing it out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-31 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r130317122
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_EVENT:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
event));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getPatternName()) &&
--- End diff --

Yes, You are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

2017-07-27 Thread yestinchen
Github user yestinchen commented on the issue:

https://github.com/apache/flink/pull/4331
  
@dawidwys Thanks for the reviewing. 

Problem 1 is easy to fix, we can just start a new match process if the only 
left computation state reaches stopState.
Problem 2 can not be avoided with current approach.  It's impossible to 
know whether there are potential matches. 

I think the best wary to implement this correctly is try to start a new 
match process after processing each event, and discard unfinished match process 
after a successful match according to the skip strategy. In order to do that, 
we need to keep the logical order of the events, which is the original idea I 
proposed. 

As for your general notes, I have some ideas:

1. I agree that the Oracle's specification is designed for bounded data. 
But match recoginize in unbounded data is very similar to bounded data, since 
all data are being processed one by one, and there's no need for bound 
information. As for **_empty match_** , I think we can just use Oracle's 
definition.
> Some patterns permit empty matches. For example:
PATTERN (A*)
can be matched by zero or more rows that are mapped to A.
An empty match does not map any rows to primary row pattern variables; 
nevertheless, an empty match has a starting row. For example, there can be an 
empty match at the first row of a row pattern partition, an empty match at the 
second row of a row pattern partition, etc. An empty match is assigned a 
sequential match number, based on the ordinal position of its starting row, the 
same as any other match.

2. I feel uncomfortable with the RuntimeExceptions too. But these 
exceptions are very important to keep the skip semantics right. I understand 
your main concern is that Exceptions will stop the matching process, which is 
unacceptable to online streaming service. To address this, I think we can 
introduce a default strategy(SKIP_TO_NEXT_EVENT, for example). If these 
exceptions happens, we can use default strategy to continue the match process, 
and change the strategy back after a successful match. We can also add a switch 
to let user decide whether to enable this feature.

3. I still think it's useful to support these skip strategies. Don't know 
why Esper does not support them.

4. Thanks for the related information. I took a brief look at the PR, which 
is very similar to this PR. I  wonder why it is closed without merging into the 
master code?

Looking forward to your feedbacks. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-26 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129518272
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_ROW:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
outgoingEdges));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getRpv()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, outgoingEdges);
--- End diff --

Now I keep `startComputationState` instead of `startState` in NFA, so it 
can calculate the `outgoingEdges` from the start state when needed. Is this 
right? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-26 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129517227
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
+ * SKIP_TO_FIRST_RPV and
+ * SKIP_TO_LAST_RPV
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
+
+   // fields
+   String rpv = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
+   if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == 
SkipStrategy.SKIP_TO_LAST) {
+   if (rpv == null) {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-26 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129517254
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
+ * SKIP_TO_FIRST_RPV and
+ * SKIP_TO_LAST_RPV
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
+
+   // fields
+   String rpv = null;
+
+   public AfterMatchSkipStrategy(){
+   this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy) {
+   this(strategy, null);
+   }
+
+   public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
+   if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == 
SkipStrategy.SKIP_TO_LAST) {
+   if (rpv == null) {
+   throw new IllegalArgumentException("the rpv 
field can not be empty when SkipStrategy is " + strategy);
+   }
+   }
+   this.strategy = strategy;
+   this.rpv = rpv;
+   }
+
+   public SkipStrategy getStrategy() {
+   return strategy;
+   }
+
+   public String getRpv() {
+   return rpv;
+   }
+
+   @Override
+   public String toString() {
+   return "AfterMatchStrategy{" +
+   "strategy=" + strategy +
+   ", rpv=" + rpv +
+   '}';
+   }
+
+   /**
+* Skip Strategy Enum.
+*/
+   public enum SkipStrategy{
+   SKIP_TO_NEXT_ROW,
+   SKIP_PAST_LAST_ROW,
+   SKIP_TO_FIRST,
+   SKIP_TO_LAST
+   }
+
+   /**
+* The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+*/
+   public static class AfterMatchSkipStrategyConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+   private static final int VERSION = 1;
+
+   /**
+* This empty constructor is required for deserializing the 
configuration.
+*/
+   public AfterMatchSkipStrategyConfigSnapshot() {
+   }
+
+   public AfterMatchSkipStrategyConfigSnapshot(
+   TypeSerializer enumSerializer,
+   TypeSerializer stringSerializer) {
+
+   super(enumSerializer, stringSerializer);
+   }
+
+   @Override
+   public int getVersion() {
+   return VERSION;
+   }
+   }
+
+   /**
+*  A {@link TypeSerializer} for the {@link AfterMatchSkipStrategy}.
+*/
+   public static cl

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-26 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129517161
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
+ * SKIP_TO_FIRST_RPV and
+ * SKIP_TO_LAST_RPV
+ * 
+ */
+public class AfterMatchSkipStrategy implements Serializable {
+
+   // default strategy
+   SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
+
+   // fields
+   String rpv = null;
--- End diff --

It means Row Pattern Variable. I already changed it to `patternName`, 
thought it would be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-26 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129516786
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ * There're four kinds of strategies:
+ * SKIP_PAST_LAST_ROW,
+ * SKIP_TO_NEXT_ROW,
--- End diff --

I changed the `ROW` to `EVENT`, is it better ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-26 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129516650
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java ---
@@ -36,6 +40,14 @@
 * @return Resulting pattern stream
 */
public static  PatternStream pattern(DataStream input, 
Pattern<T, ?> pattern) {
-   return new PatternStream<>(input, pattern);
+   return new PatternStream<>(input, pattern, skipStrategy);
+   }
+
+   /**
+* Set the pattern's skip strategy after match.
+* @param afterMatchSkipStrategy the skip strategy to use.
+*/
+   public static void setAfterMatchSkipStrategy(AfterMatchSkipStrategy 
afterMatchSkipStrategy) {
--- End diff --

Changed that into `Pattern`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-26 Thread yestinchen
Github user yestinchen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4331#discussion_r129515022
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition 
edge) {
nextVersion,
startTimestamp);
}
+
+   switch (skipStrategy.getStrategy()) {
+   case SKIP_PAST_LAST_ROW:
+   if 
(nextState.isFinal()) {
+   
resultingComputationStates.add(createStartComputationState(computationState, 
outgoingEdges));
+   }
+   break;
+   case SKIP_TO_FIRST:
+   if 
(nextState.getName().equals(skipStrategy.getRpv()) &&
+   
!nextState.getName().equals(currentState.getName())) {
+   
ComputationState startComputationState = 
createStartComputationState(computationState, outgoingEdges);
+   if (callLevel > 
0) {
+   throw 
new RuntimeException("infinite loop! Will abort the match process, please 
rewrite your pattern query");
+   }
+   // feed current 
matched event to the state.
+   
Collection<ComputationState> computationStates = 
computeNextStates(startComputationState, event, timestamp, callLevel++);
--- End diff --

Because SKIP_TO_FIRST or SKIP_TO_LAST needs to start the next match process 
at the first or last matched event in specified pattern. For example, for a 
given event stream: `a1, b1, c1, a2` and a given match `(A B C)`. If we set the 
SkipStrategy to SKIP_TO_FIRST with a pattern name `B`, we should create a new 
`startComputationState` after `b1` is being processed. And the next match 
should start at event `b1`. So we need to manually feed `b1` to the newly 
created `startComputationState`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

2017-07-14 Thread yestinchen
GitHub user yestinchen opened a pull request:

https://github.com/apache/flink/pull/4331

[FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yestinchen/flink FLINK-7169

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4331.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4331


commit 28f2e0ab2b6fd38864017fc64d2c76a65c8f7574
Author: Yestin <873915...@qq.com>
Date:   2017-07-14T08:41:51Z

[FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---