Repository: flink
Updated Branches:
refs/heads/release-1.3 73c3284ff -> b63955ed9
[FLINK-7563] [cep] Fix watermark semantics in cep and related tests.
This closes #4632
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b63955ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b63955ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b63955ed
Branch: refs/heads/release-1.3
Commit: b63955ed90d12afd32b443947a55f63069da3774
Parents: 73c3284
Author: Yestin <873915...@qq.com>
Authored: Thu Aug 31 17:09:30 2017 -0400
Committer: Dawid Wysakowicz
Committed: Sun Sep 17 18:18:59 2017 +0200
--
.../flink/cep/operator/AbstractKeyedCEPPatternOperator.java | 4 ++--
.../java/org/apache/flink/cep/operator/CEPOperatorTest.java | 8 +---
2 files changed, 7 insertions(+), 5 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/flink/blob/b63955ed/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
--
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index af4b53e..0e58f6d 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -176,10 +176,10 @@ public abstract class AbstractKeyedCEPPatternOperator
IN value = element.getValue();
// In event-time processing we assume correctness of
the watermark.
- // Events with timestamp smaller than the last seen
watermark are considered late.
+ // Events with timestamp smaller than or equal with the
last seen watermark are considered late.
// Late events are put in a dedicated side output, if
the user has specified one.
- if (timestamp >= lastWatermark) {
+ if (timestamp > lastWatermark) {
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper
watermark.
http://git-wip-us.apache.org/repos/asf/flink/blob/b63955ed/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
--
diff --git
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 38ad0f1..0b0156e 100644
---
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -373,6 +373,7 @@ public class CEPOperatorTest extends TestLogger {
verifyWatermark(harness.getOutput().poll(), 11L);
verifyWatermark(harness.getOutput().poll(), 12L);
+ // this is a late event, because timestamp(12) = last
watermark(12)
harness.processElement(new
StreamRecord(middleEvent3, 12L));
harness.processElement(new StreamRecord<>(endEvent2,
13L));
harness.processWatermark(20L);
@@ -382,8 +383,9 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(!operator2.hasNonEmptyPQ(42));
assertEquals(0L, harness.numEventTimeTimers());
+ assertEquals(3, harness.getOutput().size());
verifyPattern(harness.getOutput().poll(), startEvent2,
middleEvent2, endEvent2);
- verifyPattern(harness.getOutput().poll(), startEvent2,
middleEvent3, endEvent2);
+
verifyWatermark(harness.getOutput().poll(), 20L);
verifyWatermark(harness.getOutput().poll(), 21L);
} finally {
@@ -647,9 +649,9 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new
StreamRecord(middleEvent2, 3));
harness.processElement(new StreamRecord<>(startEvent2,
4));
harness.processWatermark(5L);
- harness.processElement(new StreamRecord<>(nextOne, 6));
+ harness.processElement(new StreamRecord<>(nextOne, 7));
harness.processElement(new