flink git commit: [hotfix] [cep] Fix afterMatchStrategy parameter missing issue
Repository: flink Updated Branches: refs/heads/master 258b385f4 -> 1269f75ee [hotfix] [cep] Fix afterMatchStrategy parameter missing issue This closes #4673 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1269f75e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1269f75e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1269f75e Branch: refs/heads/master Commit: 1269f75ee86f9c3c5dfcd0c05ee39c974bd73fa5 Parents: 258b385 Author: Yestin <873915...@qq.com> Authored: Thu Sep 14 12:01:53 2017 -0400 Committer: Dawid WysakowiczCommitted: Sun Sep 17 18:36:35 2017 +0200 -- .../AbstractKeyedCEPPatternOperator.java| 2 +- .../java/org/apache/flink/cep/CEPITCase.java| 35 2 files changed, 36 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1269f75e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 01aad76..2415a2e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -329,7 +329,7 @@ public abstract class AbstractKeyedCEPPatternOperator >, Collection , Long>>> patterns = - nfa.process(event, timestamp); + nfa.process(event, timestamp, afterMatchSkipStrategy); try { processMatchedSequences(patterns.f0, timestamp); http://git-wip-us.apache.org/repos/asf/flink/blob/1269f75e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 78e485e..81b83a3 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.cep; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; @@ -679,4 +680,38 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { return Double.compare(o1.getPrice(), o2.getPrice()); } } + + @Test + public void testSimpleAfterMatchSkip() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream > input = env.fromElements( + new Tuple2<>(1, "a"), + new Tuple2<>(2, "a"), + new Tuple2<>(3, "a"), + new Tuple2<>(4, "a")); + + Pattern , ?> pattern = + Pattern. >begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) + .where(new SimpleCondition >() { + @Override + public boolean filter(Tuple2 rec) throws Exception { + return rec.f1.equals("a"); + } + }).times(2); + + PatternStream > pStream = CEP.pattern(input, pattern); + + DataStream > result = pStream.select(new PatternSelectFunction , Tuple2 >() { + @Override + public Tuple2 select(Map >> pattern) throws Exception { + return pattern.get("start").get(0); +
flink git commit: [FLINK-7563] [cep] Fix watermark semantics in cep and related tests.
Repository: flink Updated Branches: refs/heads/release-1.3 73c3284ff -> b63955ed9 [FLINK-7563] [cep] Fix watermark semantics in cep and related tests. This closes #4632 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b63955ed Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b63955ed Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b63955ed Branch: refs/heads/release-1.3 Commit: b63955ed90d12afd32b443947a55f63069da3774 Parents: 73c3284 Author: Yestin <873915...@qq.com> Authored: Thu Aug 31 17:09:30 2017 -0400 Committer: Dawid WysakowiczCommitted: Sun Sep 17 18:18:59 2017 +0200 -- .../flink/cep/operator/AbstractKeyedCEPPatternOperator.java | 4 ++-- .../java/org/apache/flink/cep/operator/CEPOperatorTest.java | 8 +--- 2 files changed, 7 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b63955ed/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index af4b53e..0e58f6d 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -176,10 +176,10 @@ public abstract class AbstractKeyedCEPPatternOperator IN value = element.getValue(); // In event-time processing we assume correctness of the watermark. - // Events with timestamp smaller than the last seen watermark are considered late. + // Events with timestamp smaller than or equal with the last seen watermark are considered late. // Late events are put in a dedicated side output, if the user has specified one. - if (timestamp >= lastWatermark) { + if (timestamp > lastWatermark) { // we have an event with a valid timestamp, so // we buffer it until we receive the proper watermark. http://git-wip-us.apache.org/repos/asf/flink/blob/b63955ed/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 38ad0f1..0b0156e 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -373,6 +373,7 @@ public class CEPOperatorTest extends TestLogger { verifyWatermark(harness.getOutput().poll(), 11L); verifyWatermark(harness.getOutput().poll(), 12L); + // this is a late event, because timestamp(12) = last watermark(12) harness.processElement(new StreamRecord(middleEvent3, 12L)); harness.processElement(new StreamRecord<>(endEvent2, 13L)); harness.processWatermark(20L); @@ -382,8 +383,9 @@ public class CEPOperatorTest extends TestLogger { assertTrue(!operator2.hasNonEmptyPQ(42)); assertEquals(0L, harness.numEventTimeTimers()); + assertEquals(3, harness.getOutput().size()); verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2); - verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2); + verifyWatermark(harness.getOutput().poll(), 20L); verifyWatermark(harness.getOutput().poll(), 21L); } finally { @@ -647,9 +649,9 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord(middleEvent2, 3)); harness.processElement(new StreamRecord<>(startEvent2, 4)); harness.processWatermark(5L); - harness.processElement(new StreamRecord<>(nextOne, 6)); + harness.processElement(new StreamRecord<>(nextOne, 7)); harness.processElement(new
flink git commit: [FLINK-7563] [cep] Fix watermark semantics in cep and related tests.
Repository: flink Updated Branches: refs/heads/master 312e08534 -> 258b385f4 [FLINK-7563] [cep] Fix watermark semantics in cep and related tests. This closes #4632 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/258b385f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/258b385f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/258b385f Branch: refs/heads/master Commit: 258b385f4f23a1c20c85d999d1ae2bf5b0be45db Parents: 312e085 Author: Yestin <873915...@qq.com> Authored: Thu Aug 31 17:09:30 2017 -0400 Committer: Dawid WysakowiczCommitted: Sun Sep 17 18:11:43 2017 +0200 -- .../flink/cep/operator/AbstractKeyedCEPPatternOperator.java | 4 ++-- .../java/org/apache/flink/cep/operator/CEPOperatorTest.java | 8 +--- 2 files changed, 7 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index ae2d7e4..01aad76 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -170,10 +170,10 @@ public abstract class AbstractKeyedCEPPatternOperator = lastWatermark) { + if (timestamp > lastWatermark) { // we have an event with a valid timestamp, so // we buffer it until we receive the proper watermark. http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 9eb60de..ed8b923 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -604,6 +604,7 @@ public class CEPOperatorTest extends TestLogger { verifyWatermark(harness.getOutput().poll(), 11L); verifyWatermark(harness.getOutput().poll(), 12L); + // this is a late event, because timestamp(12) = last watermark(12) harness.processElement(new StreamRecord(middleEvent3, 12L)); harness.processElement(new StreamRecord<>(endEvent2, 13L)); harness.processWatermark(20L); @@ -613,8 +614,9 @@ public class CEPOperatorTest extends TestLogger { assertTrue(!operator2.hasNonEmptyPQ(42)); assertEquals(0L, harness.numEventTimeTimers()); + assertEquals(3, harness.getOutput().size()); verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2); - verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2); + verifyWatermark(harness.getOutput().poll(), 20L); verifyWatermark(harness.getOutput().poll(), 21L); } finally { @@ -871,9 +873,9 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord(middleEvent2, 3)); harness.processElement(new StreamRecord<>(startEvent2, 4)); harness.processWatermark(5L); - harness.processElement(new StreamRecord<>(nextOne, 6)); + harness.processElement(new StreamRecord<>(nextOne, 7)); harness.processElement(new