flink git commit: [hotfix] [cep] Fix afterMatchStrategy parameter missing issue

2017-09-17 Thread dwysakowicz
Repository: flink
Updated Branches:
  refs/heads/master 258b385f4 -> 1269f75ee


[hotfix] [cep] Fix afterMatchStrategy parameter missing issue

This closes #4673


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1269f75e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1269f75e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1269f75e

Branch: refs/heads/master
Commit: 1269f75ee86f9c3c5dfcd0c05ee39c974bd73fa5
Parents: 258b385
Author: Yestin <873915...@qq.com>
Authored: Thu Sep 14 12:01:53 2017 -0400
Committer: Dawid Wysakowicz 
Committed: Sun Sep 17 18:36:35 2017 +0200

--
 .../AbstractKeyedCEPPatternOperator.java|  2 +-
 .../java/org/apache/flink/cep/CEPITCase.java| 35 
 2 files changed, 36 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1269f75e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 01aad76..2415a2e 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -329,7 +329,7 @@ public abstract class AbstractKeyedCEPPatternOperator>, 
Collection, Long>>> patterns =
-   nfa.process(event, timestamp);
+   nfa.process(event, timestamp, afterMatchSkipStrategy);
 
try {
processMatchedSequences(patterns.f0, timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/1269f75e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 78e485e..81b83a3 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
@@ -679,4 +680,38 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
return Double.compare(o1.getPrice(), o2.getPrice());
}
}
+
+   @Test
+   public void testSimpleAfterMatchSkip() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   DataStream> input = env.fromElements(
+   new Tuple2<>(1, "a"),
+   new Tuple2<>(2, "a"),
+   new Tuple2<>(3, "a"),
+   new Tuple2<>(4, "a"));
+
+   Pattern, ?> pattern =
+   Pattern.>begin("start", 
AfterMatchSkipStrategy.skipPastLastEvent())
+   .where(new SimpleCondition>() {
+   @Override
+   public boolean filter(Tuple2 rec) throws Exception {
+   return rec.f1.equals("a");
+   }
+   }).times(2);
+
+   PatternStream> pStream = 
CEP.pattern(input, pattern);
+
+   DataStream> result = pStream.select(new 
PatternSelectFunction, Tuple2>() {
+   @Override
+   public Tuple2 select(Map>> pattern) throws Exception {
+   return pattern.get("start").get(0);
+ 

flink git commit: [FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

2017-09-17 Thread dwysakowicz
Repository: flink
Updated Branches:
  refs/heads/release-1.3 73c3284ff -> b63955ed9


[FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

This closes #4632


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b63955ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b63955ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b63955ed

Branch: refs/heads/release-1.3
Commit: b63955ed90d12afd32b443947a55f63069da3774
Parents: 73c3284
Author: Yestin <873915...@qq.com>
Authored: Thu Aug 31 17:09:30 2017 -0400
Committer: Dawid Wysakowicz 
Committed: Sun Sep 17 18:18:59 2017 +0200

--
 .../flink/cep/operator/AbstractKeyedCEPPatternOperator.java  | 4 ++--
 .../java/org/apache/flink/cep/operator/CEPOperatorTest.java  | 8 +---
 2 files changed, 7 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b63955ed/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index af4b53e..0e58f6d 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -176,10 +176,10 @@ public abstract class AbstractKeyedCEPPatternOperator
IN value = element.getValue();
 
// In event-time processing we assume correctness of 
the watermark.
-   // Events with timestamp smaller than the last seen 
watermark are considered late.
+   // Events with timestamp smaller than or equal with the 
last seen watermark are considered late.
// Late events are put in a dedicated side output, if 
the user has specified one.
 
-   if (timestamp >= lastWatermark) {
+   if (timestamp > lastWatermark) {
 
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper 
watermark.

http://git-wip-us.apache.org/repos/asf/flink/blob/b63955ed/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 38ad0f1..0b0156e 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -373,6 +373,7 @@ public class CEPOperatorTest extends TestLogger {
verifyWatermark(harness.getOutput().poll(), 11L);
verifyWatermark(harness.getOutput().poll(), 12L);
 
+   // this is a late event, because timestamp(12) = last 
watermark(12)
harness.processElement(new 
StreamRecord(middleEvent3, 12L));
harness.processElement(new StreamRecord<>(endEvent2, 
13L));
harness.processWatermark(20L);
@@ -382,8 +383,9 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(!operator2.hasNonEmptyPQ(42));
assertEquals(0L, harness.numEventTimeTimers());
 
+   assertEquals(3, harness.getOutput().size());
verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent2, endEvent2);
-   verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent3, endEvent2);
+
verifyWatermark(harness.getOutput().poll(), 20L);
verifyWatermark(harness.getOutput().poll(), 21L);
} finally {
@@ -647,9 +649,9 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new 
StreamRecord(middleEvent2, 3));
harness.processElement(new StreamRecord<>(startEvent2, 
4));
harness.processWatermark(5L);
-   harness.processElement(new StreamRecord<>(nextOne, 6));
+   harness.processElement(new StreamRecord<>(nextOne, 7));
harness.processElement(new 

flink git commit: [FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

2017-09-17 Thread dwysakowicz
Repository: flink
Updated Branches:
  refs/heads/master 312e08534 -> 258b385f4


[FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

This closes #4632


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/258b385f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/258b385f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/258b385f

Branch: refs/heads/master
Commit: 258b385f4f23a1c20c85d999d1ae2bf5b0be45db
Parents: 312e085
Author: Yestin <873915...@qq.com>
Authored: Thu Aug 31 17:09:30 2017 -0400
Committer: Dawid Wysakowicz 
Committed: Sun Sep 17 18:11:43 2017 +0200

--
 .../flink/cep/operator/AbstractKeyedCEPPatternOperator.java  | 4 ++--
 .../java/org/apache/flink/cep/operator/CEPOperatorTest.java  | 8 +---
 2 files changed, 7 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index ae2d7e4..01aad76 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -170,10 +170,10 @@ public abstract class AbstractKeyedCEPPatternOperator= lastWatermark) {
+   if (timestamp > lastWatermark) {
 
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper 
watermark.

http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 9eb60de..ed8b923 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -604,6 +604,7 @@ public class CEPOperatorTest extends TestLogger {
verifyWatermark(harness.getOutput().poll(), 11L);
verifyWatermark(harness.getOutput().poll(), 12L);
 
+   // this is a late event, because timestamp(12) = last 
watermark(12)
harness.processElement(new 
StreamRecord(middleEvent3, 12L));
harness.processElement(new StreamRecord<>(endEvent2, 
13L));
harness.processWatermark(20L);
@@ -613,8 +614,9 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(!operator2.hasNonEmptyPQ(42));
assertEquals(0L, harness.numEventTimeTimers());
 
+   assertEquals(3, harness.getOutput().size());
verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent2, endEvent2);
-   verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent3, endEvent2);
+
verifyWatermark(harness.getOutput().poll(), 20L);
verifyWatermark(harness.getOutput().poll(), 21L);
} finally {
@@ -871,9 +873,9 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new 
StreamRecord(middleEvent2, 3));
harness.processElement(new StreamRecord<>(startEvent2, 
4));
harness.processWatermark(5L);
-   harness.processElement(new StreamRecord<>(nextOne, 6));
+   harness.processElement(new StreamRecord<>(nextOne, 7));
harness.processElement(new