flink git commit: [FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

2017-09-17 Thread dwysakowicz
Repository: flink
Updated Branches:
  refs/heads/release-1.3 73c3284ff -> b63955ed9


[FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

This closes #4632


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b63955ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b63955ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b63955ed

Branch: refs/heads/release-1.3
Commit: b63955ed90d12afd32b443947a55f63069da3774
Parents: 73c3284
Author: Yestin <873915...@qq.com>
Authored: Thu Aug 31 17:09:30 2017 -0400
Committer: Dawid Wysakowicz 
Committed: Sun Sep 17 18:18:59 2017 +0200

--
 .../flink/cep/operator/AbstractKeyedCEPPatternOperator.java  | 4 ++--
 .../java/org/apache/flink/cep/operator/CEPOperatorTest.java  | 8 +---
 2 files changed, 7 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b63955ed/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index af4b53e..0e58f6d 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -176,10 +176,10 @@ public abstract class AbstractKeyedCEPPatternOperator
IN value = element.getValue();
 
// In event-time processing we assume correctness of 
the watermark.
-   // Events with timestamp smaller than the last seen 
watermark are considered late.
+   // Events with timestamp smaller than or equal with the 
last seen watermark are considered late.
// Late events are put in a dedicated side output, if 
the user has specified one.
 
-   if (timestamp >= lastWatermark) {
+   if (timestamp > lastWatermark) {
 
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper 
watermark.

http://git-wip-us.apache.org/repos/asf/flink/blob/b63955ed/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 38ad0f1..0b0156e 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -373,6 +373,7 @@ public class CEPOperatorTest extends TestLogger {
verifyWatermark(harness.getOutput().poll(), 11L);
verifyWatermark(harness.getOutput().poll(), 12L);
 
+   // this is a late event, because timestamp(12) = last 
watermark(12)
harness.processElement(new 
StreamRecord(middleEvent3, 12L));
harness.processElement(new StreamRecord<>(endEvent2, 
13L));
harness.processWatermark(20L);
@@ -382,8 +383,9 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(!operator2.hasNonEmptyPQ(42));
assertEquals(0L, harness.numEventTimeTimers());
 
+   assertEquals(3, harness.getOutput().size());
verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent2, endEvent2);
-   verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent3, endEvent2);
+
verifyWatermark(harness.getOutput().poll(), 20L);
verifyWatermark(harness.getOutput().poll(), 21L);
} finally {
@@ -647,9 +649,9 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new 
StreamRecord(middleEvent2, 3));
harness.processElement(new StreamRecord<>(startEvent2, 
4));
harness.processWatermark(5L);
-   harness.processElement(new StreamRecord<>(nextOne, 6));
+   harness.processElement(new StreamRecord<>(nextOne, 7));
harness.processElement(new 

flink git commit: [FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

2017-09-17 Thread dwysakowicz
Repository: flink
Updated Branches:
  refs/heads/master 312e08534 -> 258b385f4


[FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

This closes #4632


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/258b385f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/258b385f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/258b385f

Branch: refs/heads/master
Commit: 258b385f4f23a1c20c85d999d1ae2bf5b0be45db
Parents: 312e085
Author: Yestin <873915...@qq.com>
Authored: Thu Aug 31 17:09:30 2017 -0400
Committer: Dawid Wysakowicz 
Committed: Sun Sep 17 18:11:43 2017 +0200

--
 .../flink/cep/operator/AbstractKeyedCEPPatternOperator.java  | 4 ++--
 .../java/org/apache/flink/cep/operator/CEPOperatorTest.java  | 8 +---
 2 files changed, 7 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index ae2d7e4..01aad76 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -170,10 +170,10 @@ public abstract class AbstractKeyedCEPPatternOperator= lastWatermark) {
+   if (timestamp > lastWatermark) {
 
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper 
watermark.

http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 9eb60de..ed8b923 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -604,6 +604,7 @@ public class CEPOperatorTest extends TestLogger {
verifyWatermark(harness.getOutput().poll(), 11L);
verifyWatermark(harness.getOutput().poll(), 12L);
 
+   // this is a late event, because timestamp(12) = last 
watermark(12)
harness.processElement(new 
StreamRecord(middleEvent3, 12L));
harness.processElement(new StreamRecord<>(endEvent2, 
13L));
harness.processWatermark(20L);
@@ -613,8 +614,9 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(!operator2.hasNonEmptyPQ(42));
assertEquals(0L, harness.numEventTimeTimers());
 
+   assertEquals(3, harness.getOutput().size());
verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent2, endEvent2);
-   verifyPattern(harness.getOutput().poll(), startEvent2, 
middleEvent3, endEvent2);
+
verifyWatermark(harness.getOutput().poll(), 20L);
verifyWatermark(harness.getOutput().poll(), 21L);
} finally {
@@ -871,9 +873,9 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new 
StreamRecord(middleEvent2, 3));
harness.processElement(new StreamRecord<>(startEvent2, 
4));
harness.processWatermark(5L);
-   harness.processElement(new StreamRecord<>(nextOne, 6));
+   harness.processElement(new StreamRecord<>(nextOne, 7));
harness.processElement(new