[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4502 @twalthr Have updated the PR and looking forward to your comments. ---
[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5141#discussion_r159149194 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -191,22 +191,28 @@ public boolean isEmpty() { */ public boolean prune(long pruningTimestamp) { Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator(); - boolean pruned = false; + List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>(); while (iter.hasNext()) { SharedBufferPage<K, V> page = iter.next().getValue(); - if (page.prune(pruningTimestamp)) { - pruned = true; - } + page.prune(pruningTimestamp, prunedEntries); if (page.isEmpty()) { // delete page if it is empty iter.remove(); } } - return pruned; + if (!prunedEntries.isEmpty()) { + for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) { + entry.getValue().removeEdges(prunedEntries); + } + prunedEntries.clear(); --- End diff -- Updated. ---
[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5141#discussion_r158909117 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -82,9 +82,12 @@ private transient Map<K, SharedBufferPage<K, V>> pages; + private final transient List<SharedBufferEntry<K, V>> prunedEntries; --- End diff -- Updated the PR. Thanks a lot for the review. :) ---
[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5141#discussion_r158908615 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -82,9 +82,12 @@ private transient Map<K, SharedBufferPage<K, V>> pages; + private final transient List<SharedBufferEntry<K, V>> prunedEntries; --- End diff -- Make sense, I will change it to local variable. ---
[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5141#discussion_r15833 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -82,9 +82,12 @@ private transient Map<K, SharedBufferPage<K, V>> pages; + private final transient List<SharedBufferEntry<K, V>> prunedEntries; --- End diff -- It is for object reuse. What's your thought? ---
[GitHub] flink issue #5142: [FLINK-8227] Optimize the performance of SharedBufferSeri...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5142 The page number is the number of patterns and so it is usually very small. The `long` is only needed in theory just as you said MAX_INT * MAX_INT resulting to MAX_LONG. If this makes sense, then the `HashMap` may be should also be replaced as the entry number in one page may be MAX_LONG in theory too. Changing `int` to `long` will introduce state incompatible issue and we need to take care of that when performing serialization/deserialization. I just wonder if it is worth making this change. ---
[GitHub] flink issue #5141: [FLINK-8226] [cep] Dangling reference generated after NFA...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5141 @dawidwys Could you help to take a look at this PR? This is a bug fix and the issue can be easily reproduced with the test case included in the PR. Thanks a lot. ---
[GitHub] flink issue #5142: [FLINK-8227] Optimize the performance of SharedBufferSeri...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5142 @dawidwys @StephanEwen Sorry for late response. For question 1 and 2, I have the same thought with @dawidwys and have updated the PR accordingly. For question 3, I think `int` is enough as we currently store `SharedBufferEntry` in a `HashMap` for each `SharedBufferPage`, and the size of `HashMap` is `int`. If we want to support `long`, we should also change `HashMap` to something else. What's your thought? ---
[GitHub] flink pull request #5142: [FLINK-8227] Optimize the performance of SharedBuf...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5142 [FLINK-8227] Optimize the performance of SharedBufferSerializer ## What is the purpose of the change *This pull request optimize the performance of SharedBufferSerializer* ## Verifying this change This change is a performance improvement without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink optimize_sharedbuffer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5142.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5142 commit b586abec579ef7f251333032c9385d7e71f3799b Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-12-09T03:51:04Z [FLINK-8227] Optimize the performance of SharedBufferSerializer ---
[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5141 [FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry â¦med out SharedBufferEntry *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## What is the purpose of the change *(For example: This pull request fix the issue that dangling reference generated after NFA clean up timed out SharedBufferEntry. Exception will be thrown when serializing NFA.* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *Added tests NFATest#testTimeoutWindowPruning2* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink dangling_ref Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5141.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5141 commit 982bfafaabcfbfd78f4fcbdd9438eab9c8be65bb Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-12-09T02:55:14Z [FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry ---
[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5063 The current known impact is that it will need to checkpoint more timer. For the impact to performance, I will do some benchmark later. :) I think we only need to consider this optimization if there are cases where users have to use punctuated watermark in the way that generating one watermark for every incoming event. I will reopen this PR if we find a persuading use case and reason. :) ---
[GitHub] flink pull request #5036: [FLINK-8106] [cep] Optimize the timer logic in Abs...
Github user dianfu closed the pull request at: https://github.com/apache/flink/pull/5036 ---
[GitHub] flink issue #5036: [FLINK-8106] [cep] Optimize the timer logic in AbstractKe...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5036 Close this PR for the same reason as #5063 ---
[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5063 For historical reasons, we only use punctuated watermark currently. But I think you are right, we should consider using periodic watermark instead. Thanks a lot for your explanation. I will close this PR. ---
[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...
Github user dianfu closed the pull request at: https://github.com/apache/flink/pull/5063 ---
[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5063 @fhueske Thanks a lot for your comments. Definitely agree with you on the motivation of punctuated watermark. But in practice, there are many cases there is no watermark information in the source data at all. So we have to generate one watermark for every incoming event. ---
[GitHub] flink pull request #5080: [FLINK-8159] [cep] Add rich support for SelectWrap...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5080 [FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper ## What is the purpose of the change *This pull request add the rich support for SelectWrapper and FlatSelectWrapper. It the wrapped functions are rich function, it should process correctly.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink SelectFunction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5080.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5080 commit 4c3ccb008b38d44189578975b5eee9208561567b Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-11-27T12:50:30Z [FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper ---
[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5063 @fhueske Yes, the bad performance is related to the way we generate the watermarks. But this is the way how punctuated watermark works. If we do not optimize for punctuated watermark, then are we suggesting users not to use punctuated watermark for unbounded over window? For the side effect `By registering timers on different timestamps, we have many timers that all will fire when a watermark is received.`, I think we can add a variable such as `lastProcessedWatermark` to record the last processed watermark. Then there will be no much overhead. Thoughts? ---
[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5063#discussion_r152955709 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala --- @@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver( // discard late record if (timestamp > curWatermark) { // ensure every key just registers one timer - ctx.timerService.registerEventTimeTimer(curWatermark + 1) + ctx.timerService.registerEventTimeTimer(timestamp) --- End diff -- @fhueske Thanks a lot for your comments. Your concern makes sense to me. I think the current implementation is ok under periodic watermark. But I'm not sure if it's optimal under punctuated watermark. We will perform some performance test for unbounded over under punctuated watermark and share the results. ---
[GitHub] flink issue #5063: [FLINK-8144] [table] Optimize the timer logic in RowTimeU...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/5063 {{AbstractKeyedCEPPatternOperator}} has similar logic as {{RowTimeUnboundedOver}}. As described in FLINK-8106, we find that the performance is very bad under the current logic for {{AbstractKeyedCEPPatternOperator}}. The throughput can increase from 10+ tps to about 3500 tps for one operator in the case of RocksDBStateBackend after optimizing the timer logic. I think the optimization should also apply to {{RowTimeUnboundedOver}}. BTW: the watermark we use in the CEP use case is {{AssignerWithPunctuatedWatermarks}}. It will generate one watermark for every input element. ---
[GitHub] flink pull request #5063: [FLINK-8144] [table] Optimize the timer logic in R...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5063 [FLINK-8144] [table] Optimize the timer logic in RowTimeUnboundedOver ## What is the purpose of the change *This pull request optimize the timer handling in RowTimeUnboundedOver. Currently the MapState will be scanned a lot of times if the watermark arrives some seconds later than the event.* ## Verifying this change This change is already covered by existing tests, such as *OverWindowHarnessTest.testRowTimeUnboundedRangeOver*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink optimize_timer_over Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5063.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5063 commit a7d7635be1b126283573ac5b55472a79d94ac0fb Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-11-24T09:28:02Z [FLINK-8144] [table] Optimize the timer logic in RowTimeUnboundedOver ---
[GitHub] flink pull request #5036: [FLINK-8106] [cep] Optimize the timer logic in Abs...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5036 [FLINK-8106] [cep] Optimize the timer logic in AbstractKeyedCEPPatternOperator ## What is the purpose of the change *This pull request optimize the performance of AbstractKeyedCEPPatternOperator* ## Verifying this change - *Existing tests* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink optimize_timer_cep Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5036.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5036 commit 74d432f998afe38ed85ff24481521e5db09805c0 Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-11-20T08:20:53Z [FLINK-8106] [cep] Optimize the timer logic in AbstractKeyedCEPPatternOperator ---
[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5025#discussion_r151625442 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -234,11 +234,12 @@ abstract class StreamTableEnvironment( "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") } val outputType = sink.getOutputType +val resultType = getResultType(table.getRelNode, optimizedPlan) --- End diff -- The `resultType` generated from the optimized plan contains time indicator information. In `StreamTableEnvironment.translate`, it needs this information to transform the time indicator column to `TimeStamp` type. For the type consistent issue, it will be validated during converting CRow to output type: https://github.com/apache/flink/blob/81dc260dc653085b9dbf098e8fd70a72d2d0828e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala#L941 ---
[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5025#discussion_r151609882 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala --- @@ -179,6 +179,32 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { } @Test + def testCalcMaterialization3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +val tEnv = TableEnvironment.getTableEnvironment(env) +MemoryTableSinkUtil.clear + +val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) +val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + +val t = table --- End diff -- Removed. ---
[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5025#discussion_r151609700 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -234,11 +234,12 @@ abstract class StreamTableEnvironment( "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") } val outputType = sink.getOutputType +val resultType = getResultType(table.getRelNode, optimizedPlan) --- End diff -- Do you mean adding method such as `getResultType` in `TableSink`? The `resultType` means the type of the result of the current sql query. IMO, even we can specify the `resultType` in `TableSink`, it can only be used for validation purpose, we should not use it directly. Thoughts? ---
[GitHub] flink pull request #5027: [FLINK-8097] [table] Add built-in support for min/...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5027 [FLINK-8097] [table] Add built-in support for min/max aggregation for Date/Time ## What is the purpose of the change *This PR adds built-in support for min/max aggregation for Date/Time* ## Verifying this change - *Added tests in MaxAggFunctionTest, MaxWithRetractAggFunctionTest, MinAggFunctionTest, MinWithRetractAggFunctionTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink FLINK-8097 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5027.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5027 commit 7c0cf68279570cb8f8bd00e18b8fb38de943a645 Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-11-17T05:22:47Z [FLINK-8097] [table] Add built-in support for min/max aggregation for Date/Time ---
[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5025 [FLINK-8096] [table] Fix time material issue when write to TableSink ## What is the purpose of the change *This pull request fix the time material issue when write to TableSink.* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *Added test TimeAttributesITCase.testCalcMaterialization3* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink fix-result-type Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5025.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5025 commit e672bf610e87dfd5e9847fa6324bf3ab2572b50b Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-11-17T02:53:31Z [FLINK-8096] [table] Fix time material issue when write to TableSink ---
[GitHub] flink issue #4936: [FLINK-7962] Add built-in support for min/max aggregation...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4936 @wuchong Thanks a lot for your review. @fhueske It would be great if you can also give some feedback. ---
[GitHub] flink pull request #4936: [FLINK-7962] Add built-in support for min/max aggr...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4936 [FLINK-7962] Add built-in support for min/max aggregation for Timestamp ## What is the purpose of the change *This JIRA adds the built-in support for min/max aggregation for Timestamp.* ## Brief change log - *Add TimestampMinAggFunction, TimestampMinWithRetractAggFunction, TimestampMaxAggFunction and TimestampMaxWithRetractAggFunction* - *Add TimestampOrdering* ## Verifying this change This change added tests and can be verified as follows: - *Added TimestampMaxAggFunctionTest, TimestampMaxWithRetractAggFunctionTest, TimestampMinAggFunctionTest and TimestampMinWithRetractAggFunctionTest* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink timestamp_minmax Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4936.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4936 commit 8c69d1daae156852fc88b5d5ac20683a8979c353 Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-11-02T03:26:25Z [FLINK-7962] Add built-in support for min/max aggregation for Timestamp ---
[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4502 @fhueske Could you help to take a look at this PR in case you missed? This PR has been ready for more than one month. Very appreciated. ---
[GitHub] flink issue #4513: [FLINK-6938] [cep] IterativeCondition should support Rich...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 @dawidwys Any comments? ---
[GitHub] flink issue #4513: [FLINK-6938] [cep] IterativeCondition should support Rich...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 @dawidwys It would be great if you could take a look. Very appreciated! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r135441351 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -421,6 +437,15 @@ private void addStopStateToLooping(final State loopingState) { untilCondition, true); + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) && + times.getFrom() != times.getTo()) { + if (untilCondition != null) { + State sinkStateCopy = copy(sinkState); + originalStateMap.put(sinkState.getName(), sinkStateCopy); --- End diff -- originalStateMap is used when compiling the NFA and it will be collected after NFA is created and so I think it's unnecessary to clear the entries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r135440842 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional); } - final IterativeCondition trueFunction = getTrueFunction(); - final State singletonState = createState(currentPattern.getName(), State.StateType.Normal); // if event is accepted then all notPatterns previous to the optional states are no longer valid final State sink = copyWithoutTransitiveNots(sinkState); singletonState.addTake(sink, takeCondition); + // if no element accepted the previous nots are still valid. + final IterativeCondition proceedCondition = getTrueFunction(); + // for the first state of a group pattern, its PROCEED edge should point to the following state of // that group pattern and the edge will be added at the end of creating the NFA for that group pattern if (isOptional && !headOfGroup(currentPattern)) { - // if no element accepted the previous nots are still valid. - singletonState.addProceed(proceedState, trueFunction); + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { + final IterativeCondition untilCondition = + (IterativeCondition) currentPattern.getUntilCondition(); + if (untilCondition != null) { + singletonState.addProceed( + originalStateMap.get(proceedState.getName()), + new AndCondition<>(proceedCondition, untilCondition)); --- End diff -- When untilCondition holds, the loop should break and the state should proceed to the next state. This is covered by the test case GreedyITCase#testGreedyUntilWithDummyEventsBeforeQuantifier. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4296 Sure. Have created FLINK-7496 to track this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4513: [FLINK-6938] [cep] IterativeCondition should support Rich...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 @dawidwys I have updated the PR and it currently only contains changes of the RichFunction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4563: [FLINK-7479] [cep] Support to retrieve the past event by ...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4563 @dawidwys Have updated the PR according to your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4563: [FLINK-7479] [cep] Support to retrieve the past event by ...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4563 Thanks @dawidwys for the suggestion. I will update the PR accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 Thanks @dawidwys for the remind. Yes, you're right and that make sense to me. I will update the the PR and remove ConditionRegistry related changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4563: [FLINK-7479] [cep] Support to retrieve the past ev...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4563 [FLINK-7479] [cep] Support to retrieve the past event by an offset ## What is the purpose of the change *Currently, it's already able to retrieve events matched to the specifed pattern in IterativeCondition.Context. While there are also requirements to retrieve events by an physical offset. The retrieved events may not be matched to any pattern.* ## Brief change log - *Add API retain() in Pattern* - *Buffer the past events in NFA.process* - *Access the past events by the newly added API getEventByOffset in IterativeCondition.Context* ## Verifying this change This change added tests and can be verified as follows: - *Added test in IterativeConditionsITCase* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink FLINK-7479 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4563.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4563 commit e9feb55e6c2f7d32ec6266049be0ea9bbff967b2 Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-08-18T12:50:25Z [FLINK-7479] [cep] Support to retrieve the past event by an offset --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4502 @fhueske @kl0u @dawidwys @wuchong Any feedback? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4523: [FLINK-7123] [cep] Support timesOrMore in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4523 @kl0u That's OK. Thanks a lot. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 @dawidwys @kl0u In case you missed this PR, could you help to take a look at? Very appreciated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4523: [FLINK-7123] [cep] Support timesOrMore in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4523 @dawidwys @kl0u Could you help to take a look at this PR? Thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132867483 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(Queue<ComputationState> computationStates, + Collection<Map<String, List>> matchedResult, AfterMatchSkipStrategy afterMatchSkipStrategy) { + Set discardEvents = new HashSet<>(); + switch(afterMatchSkipStrategy.getStrategy()) { + case SKIP_TO_LAST: + for (Map<String, List> resultMap: matchedResult) { + for (Map.Entry<String, List> keyMatches : resultMap.entrySet()) { + if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { + discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1)); + break; + } else { + discardEvents.addAll(keyMatches.getValue()); + } + } + } + break; + case SKIP_TO_FIRST: + for (Map<String, List> resultMap: matchedResult) { + for (Map.Entry<String, List> keyMatches : resultMap.entrySet()) { + if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { + break; + } else { + discardEvents.addAll(keyMatches.getValue()); + } + } + } + break; + case SKIP_PAST_LAST_EVENT: + for (Map<String, List> resultMap: matchedResult) { + for (List eventList: resultMap.values()) { + discardEvents.addAll(eventList); + } + } + break; + } + if (!discardEvents.isEmpty()) { + List<ComputationState> discardStates = new ArrayList<>(); + for (ComputationState computationState : computationStates) { + Map<String, List> partialMatch = extractCurrentMatches(computationState); + for (List list: partialMatch.values()) { + for (T e: list) { + if (discardEvents.contains(e)) { + // discard the computation state. + eventSharedBuffer.release( + NFAStateNameHandler.getOriginalNameFromInternal( + computationState.getState().getName()), + computationState.getEvent(), + computationState.getTimestamp(), + computationState.getCounter() + ); + discardStates.add(computationState); --- End diff -- Should add **break;** after **discardStates.add(computationState);**, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4523: [FLINK-7123] [cep] Support timesOrMore in CEP
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4523 [FLINK-7123] [cep] Support timesOrMore in CEP ## What is the purpose of the change *This pull request adds timesOrMore to CEP pattern API* ## Verifying this change This change added tests and can be verified as follows: - *Added test in TimesOrMoreITCase* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink timesOrMore Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4523.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4523 commit 6ccbb0f53f1a860ae05b3d24c17408cf22a5aab0 Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-08-11T03:29:01Z [FLINK-7123] [cep] Support timesOrMore in CEP --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4502 @fhueske Great, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4296 @dawidwys Any comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4513: [FLINK-6938][FLINK-6939] [cep] Not store IterativeConditi...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4513 As discussed with @wuchong offline, I will continue the work of #4145 as this is required by the feature of cep on sql, see the PR of FLINK-7062 for details. @dawidwys @kl0u Could you help to review? Actually, I just rebased the code of #4145 and addressed the comments there. Most of the fixes refer the test branch https://github.com/kl0u/flink/tree/cep-iter-pr by @kl0u. I have not removed IterativeCondition as I think it breaks the backward compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4513: [FLINK-6938][FLINK-6939] [cep] Not store Iterative...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4513 [FLINK-6938][FLINK-6939] [cep] Not store IterativeCondition with NFA state and support RichFunction interface ## What is the purpose of the change *The core idea is that the StateTransition is unique in a NFA graph. So we store the conditions with a map which mapping from StateTransition to IterativeCondition, so the conditions can not serialized with NFA state. If I missed something, please point out. This PR also includes FLINK-6938: IterativeCondition supports RichFunction interface.* ## Verifying this change This change is already covered by existing tests*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink IterativeCondition Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4513.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4513 commit e8f4bfd55eb50b151b8160c7c8f8901114aa7606 Author: Jark Wu <wuchong...@alibaba-inc.com> Date: 2017-06-20T06:02:21Z [FLINK-6938][FLINK-6939] [cep] Not store IterativeCondition with NFA state and support RichFunction interface commit 994bc06c11bffbf129d7160a4db2a16db01199d8 Author: Jark Wu <wuchong...@alibaba-inc.com> Date: 2017-06-20T13:25:59Z minor change commit ae26dab180943f9b6609341d7447718f772b8b19 Author: Jark Wu <wuchong...@alibaba-inc.com> Date: 2017-06-22T03:17:47Z address dawid's comments commit e8c28049d2eb96500f53d6e7f284ae074280b5a1 Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-08-10T12:25:50Z Rebase the code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4502: [FLINK-7062] [table, cep] Support the basic functionality...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4502 @kl0u @dawidwys @wuchong It will be great if you could take a look at this PR. This PR add the basic support for cep on sql. Thanks in advance. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4502: [FLINK-7062] [table, cep] Support the basic functi...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4502 [FLINK-7062] [table, cep] Support the basic functionality of MATCH_RECOGNIZE ## What is the purpose of the change *This pull request adds the basic support of MATCH_RECOGNIZE.* ## Brief change log *(for example:)* - *The MATCH_RECOGNIZE clause is transformed to CEP job with the existing CEP API* ## Verifying this change This change added tests and can be verified as follows: - *Added test that validates MATCH_RECOGNIZE is parsed correctly and expected results are got* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink FLINK-7062 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4502.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4502 commit 34f7704f72cdb9cf0e3808e779b35d72eadb3e7f Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-08-08T11:10:24Z [FLINK-7062] [table, cep] Support the basic functionality of MATCH_RECOGNIZE --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r131804949 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java --- @@ -0,0 +1,1068 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link Pattern#greedy()}. + */ +public class GreedyITCase extends TestLogger { + + @Test + public void testGreedyZeroOrMore() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyZeroOrMoreConsecutiveEndWithOptional() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new Stream
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r131804902 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java --- @@ -0,0 +1,1068 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link Pattern#greedy()}. + */ +public class GreedyITCase extends TestLogger { + + @Test + public void testGreedyZeroOrMore() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyZeroOrMoreConsecutiveEndWithOptional() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new Stream
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r131804887 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java --- @@ -0,0 +1,1068 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link Pattern#greedy()}. + */ +public class GreedyITCase extends TestLogger { + + @Test + public void testGreedyZeroOrMore() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyZeroOrMoreConsecutiveEndWithOptional() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new Stream
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r131804932 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java --- @@ -0,0 +1,1068 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link Pattern#greedy()}. + */ +public class GreedyITCase extends TestLogger { + + @Test + public void testGreedyZeroOrMore() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyZeroOrMoreConsecutiveEndWithOptional() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new Stream
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r131804764 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -637,9 +675,23 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { untilCondition, true); - final IterativeCondition proceedCondition = getTrueFunction(); + IterativeCondition proceedCondition = getTrueFunction(); final State loopingState = createState(currentPattern.getName(), State.StateType.Normal); - loopingState.addProceed(sinkState, proceedCondition); + + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { + State sinkStateCopy = copy(sinkState); --- End diff -- Make sense. Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r131804398 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -386,6 +387,19 @@ private void checkPatternNameUniqueness(final Pattern pattern) { return copyOfSink; } + private State copy(final State state) { + final State copyOfState = new State<>(state.getName(), state.getStateType()); --- End diff -- Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4418: [FLINK-7293] [cep] Support custom order by in PatternStre...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4418 @kl0u Good catch. Have updated the PR according to the comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4296 @dawidwys Regarding to the times().greedy(), the result is not expected and have fixed the issue in the latest PR. Also updated the doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4418: [FLINK-7293] [cep] Support custom order by in PatternStre...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4418 @dawidwys Updated the contribution checklist. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4418: [FLINK-7293] [cep] Support custom order by in PatternStre...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4418 @dawidwys Thanks a lot for your review. Have updated the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4418#discussion_r130896153 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java --- @@ -923,6 +934,126 @@ public boolean filter(Event value) throws Exception { } } + @Test + public void testCEPOperatorComparatorProcessTime() throws Exception { + Event startEvent1 = new Event(42, "start", 1.0); + Event startEvent2 = new Event(42, "start", 2.0); + SubEvent middleEvent1 = new SubEvent(42, "foo1", 3.0, 10.0); + SubEvent middleEvent2 = new SubEvent(42, "foo2", 4.0, 10.0); + Event endEvent1 = new Event(42, "end", 1.0); + + Event startEventK2 = new Event(43, "start", 1.0); + + KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearatorWithComparator(true); + OneInputStreamOperatorTestHarness<Event, Map<String, List>> harness = getCepTestHarness(operator); + + try { + harness.open(); + + harness.setProcessingTime(0L); + + harness.processElement(new StreamRecord<>(startEvent1, 1L)); --- End diff -- Updated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4418#discussion_r130896034 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java --- @@ -257,7 +289,32 @@ public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exceptio @Override public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception { - // not used + NFA nfa = getNFA(); + + // emit the events in order + for (IN event : sort(bufferedEvents.get())) { + processEvent(nfa, event, getProcessingTimeService().getCurrentProcessingTime()); + } + + // remove all buffered rows + bufferedEvents.clear(); + + updateNFA(nfa); + } + + private Iterable sort(Iterable iter) { + if (comparator == null) { + return iter; + } else { + // insert all events into the sort buffer + List sortBuffer = new ArrayList<>(); --- End diff -- Good advice. Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4418#discussion_r130896085 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java --- @@ -923,6 +934,126 @@ public boolean filter(Event value) throws Exception { } } + @Test + public void testCEPOperatorComparatorProcessTime() throws Exception { + Event startEvent1 = new Event(42, "start", 1.0); + Event startEvent2 = new Event(42, "start", 2.0); + SubEvent middleEvent1 = new SubEvent(42, "foo1", 3.0, 10.0); + SubEvent middleEvent2 = new SubEvent(42, "foo2", 4.0, 10.0); + Event endEvent1 = new Event(42, "end", 1.0); + + Event startEventK2 = new Event(43, "start", 1.0); + + KeyedCEPPatternOperator<Event, Integer> operator = getKeyedCepOpearatorWithComparator(true); + OneInputStreamOperatorTestHarness<Event, Map<String, List>> harness = getCepTestHarness(operator); + + try { + harness.open(); + + harness.setProcessingTime(0L); + + harness.processElement(new StreamRecord<>(startEvent1, 1L)); --- End diff -- Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4418#discussion_r130895852 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala --- @@ -36,8 +38,26 @@ object CEP { * @tparam T Type of the input events * @return Resulting pattern stream */ - def pattern[T](input: DataStream[T], pattern: Pattern[T, _]): PatternStream[T] = { + def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T] = { wrapPatternStream(JCEP.pattern(input.javaStream, pattern.wrappedPattern)) } + + /** +* Transforms a [[DataStream]] into a [[PatternStream]] in the Scala API. +* See [[org.apache.flink.cep.CEP}]]for a more detailed description how the underlying +* Java API works. +* +* @param input DataStream containing the input events +* @param patternPattern specification which shall be detected +* @param comparator Comparator to sort events --- End diff -- Updated the doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4418#discussion_r130895782 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java --- @@ -38,4 +40,17 @@ public static PatternStream pattern(DataStream input, Pattern<T, ?> pattern) { return new PatternStream<>(input, pattern); } + + /** +* Creates a {@link PatternStream} from an input data stream and a pattern. +* +* @param input DataStream containing the input events +* @param pattern Pattern specification which shall be detected +* @param comparator Comparator to sort events --- End diff -- Updated the doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r130883414 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -657,25 +663,34 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) { true); IterativeCondition proceedCondition = getTrueFunction(); - if (currentPattern.getQuantifier().isGreedy()) { - proceedCondition = getGreedyCondition(proceedCondition, Lists.newArrayList(takeCondition)); + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { + proceedCondition = getGreedyCondition( + proceedCondition, + takeCondition, + ignoreCondition, + followingTakeCondition);; } final State loopingState = createState(currentPattern.getName(), State.StateType.Normal, - currentPattern.getQuantifier().isGreedy()); + currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)); loopingState.addProceed(sinkState, proceedCondition); loopingState.addTake(takeCondition); addStopStateToLooping(loopingState); if (ignoreCondition != null) { final State ignoreState = createState(currentPattern.getName(), State.StateType.Normal, - currentPattern.getQuantifier().isGreedy()); + currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)); ignoreState.addTake(loopingState, takeCondition); ignoreState.addIgnore(ignoreCondition); loopingState.addIgnore(ignoreState, ignoreCondition); + if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { --- End diff -- @dawidwys Good advice. Thanks a lot :). I have updated the PR per the solution you suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4373: [FLINK-6429] [table] Bump up Calcite version to 1....
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4373#discussion_r130829080 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala --- @@ -55,12 +55,13 @@ class JoinTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(1), -term("select", "a", "b", "proctime") +term("select", "a", "b", "-(proctime, 360) AS -", --- End diff -- I think this issue may be caused by CALCITE-1753, still investigating the changes in that JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4373: [FLINK-6429] [table] Bump up Calcite version to 1....
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4373#discussion_r130801549 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala --- @@ -55,12 +55,13 @@ class JoinTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(1), -term("select", "a", "b", "proctime") +term("select", "a", "b", "-(proctime, 360) AS -", --- End diff -- In the original test, the calculation of **proctime** is located when doing **Join**. While currently, it's pushed down to the **DataStremaCalc**. It seems that this is not correct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4373: [FLINK-6429] [table] Bump up Calcite version to 1.13.
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4373 I have did some investigation of the test failure of **JoinITCase.testJoinWithExpressionPreds** and would like to share my findings and solutions for your reference. The cause of this issue is that for preserved expressions, in **PushProjector#createProjectRefsAndExprs**, the column names corresponding to them will be the operator names of the expressions. For example for expression **a - 1** in the test case, the column corresponding to it will be **-**. I think this behavior is not expected and have copied **PushProjector** from calcite and made some changes to it (line 507). Please refer to [here](https://github.com/dianfu/flink/commit/efa9641e0bd395a3679b0d496b60e3d42aa7b832) for more information. For the files copied from calcite, all files can be removed except **SqlTimestampAddFunction** and **AuxiliaryConverter**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130264164 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { --- End diff -- Should also consider the situation **Proceed to Final state**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130266394 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + Collection<ComputationState> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); + resultingComputationStates.addAll(computationStates); + } else if (previousState == null && currentState.getName().equals(skipStrategy.getPatternName())) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + break; + case SKIP_TO_LAST: + if (currentState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + Collection<ComputationState> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); + resultingComputationStates.addAll(computationStates); + } + break; + } break; } } - if (computationState.isStartState()) { - int totalBranches = calculateIncreasingSelfState( - outgoingEdges.getTotalIgnoreBranches(), - outgoingEdges.getTotalTakeBranches()); - - DeweyNumber startVersion = computationState.getVersion().increase(totalBranches); - ComputationState startState = ComputationState.createStartState(this, computationState.getState(), startVersion); - resultingComputationStates.add(startState); + if (computationState.isStartState() && + skipStrategy.getStrategy(
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130264936 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && --- End diff -- Should use NFAStateNameHandler.getOriginalNameFromInternal() to compare state name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130265354 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { --- End diff -- Why need the callLevel? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4418: [FLINK-7293] [cep] Support custom order by in Patt...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4418 [FLINK-7293] [cep] Support custom order by in PatternStream *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-1234] [component] Title of the pull request", where *FLINK-1234* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink support_comparator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4418.patch
[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4331 Sorry for late response. I think this feature is very useful and agree that we should have a clear thought on what things should be for each skip strategy. I noticed that there are already some discussions in FLINK-3703 which we can refer. I will take a look at this PR and also FLINK-3703 these two days and will post my thought. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4296 @dawidwys Thanks a lot for the review. I have updated the patch. Currently, there is something wrong when the greedy state is followed by an optional state. This can be covered by test case GreedyITCase.testGreedyZeroOrMoreBeforeOptional2 (duplicate results will be got). Solutions from my mind are removing the duplicate results before returning the results in NFA or disabling this case for the time being. What's your thought? Do you have any suggestions to this problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r129832284 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link Pattern#greedy()}. + */ +public class GreedyITCase extends TestLogger { + + @Test + public void testGreedyFollowedBy() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 2.0); + Event a3 = new Event(43, "a", 2.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(a2, 3)); + inputEvents.add(new StreamRecord<>(a3, 4)); + inputEvents.add(new StreamRecord<>(d, 5)); + + // c a* d + Pattern<Event, ?> pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy("middle").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).oneOrMore().optional().greedy().followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List>newArrayList( + Lists.newArrayList(c, a1, a2, a3, d) + )); + } + + @Test + public void testGreedyUtil() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event a2 = new Event(42, "a", 3.0); + Event a3 = new Event(43, "a", 4.0); + Event d = new Event(44, "d", 3.0); + + inputEvents.add(new StreamRecord<>(c, 1)); +
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r129830324 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java --- @@ -105,6 +107,14 @@ public void optional() { properties.add(Quantifier.QuantifierProperty.OPTIONAL); } + public void greedy() { + greedy = true; --- End diff -- Make sense. Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4296#discussion_r129830367 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java --- @@ -492,4 +506,10 @@ private void checkIfQuantifierApplied() { "Current quantifier is: " + quantifier); } } + + private void checkIfNoFollowedByAny() { + if (quantifier.getConsumingStrategy() == ConsumingStrategy.SKIP_TILL_ANY) { --- End diff -- Make sense. Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4296 @dawidwys OK. Have a good time. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4296: [FLINK-7147] [cep] Support greedy quantifier in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4296 @dawidwys Could you help to take a look at this PR? Thanks a lot in advance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4318: [FLINK-7170] [cep] Fix until condition when the co...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4318 [FLINK-7170] [cep] Fix until condition when the contiguity is strict Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink fix-until-condition Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4318.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4318 commit 605311b8db8669ab1086eebc953892f90eb27ffa Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-07-13T09:21:30Z [FLINK-7170] [cep] Fix until condition when the contiguity is strict --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4296 [FLINK-7147] [cep] Support greedy quantifier in CEP Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink greedy_quantifier Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4296.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4296 commit 647d4c58309990d9fc924bc4343ef811dacf4ef1 Author: Dian Fu <fudian...@alibaba-inc.com> Date: 2017-07-11T08:03:42Z [FLINK-7147] [cep] Support greedy quantifier in CEP --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4153 @dawidwys Thanks a lot for the review. Updated the doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4153 @dawidwys thanks a lot for your comments. Have updated the PR and it should have addressed all the comments. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125178823 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { --- End diff -- updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176853 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern<Event, ?> pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List>newArrayList( + Lists.newArrayList(c, a1, b1,
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176801 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern<Event, ?> pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List>newArrayList( + Lists.newArrayList(c, a1, b1,
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176457 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern<Event, ?> pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List>newArrayList( + Lists.newArrayList(c, a1, b1,
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176383 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { + List<StreamRecord> inputEvents = new ArrayList<>(); + + Event c = new Event(40, "c", 1.0); + Event a1 = new Event(41, "a", 2.0); + Event b1 = new Event(42, "b", 3.0); + Event a2 = new Event(43, "a", 4.0); + Event b2 = new Event(44, "b", 5.0); + Event d = new Event(45, "d", 6.0); + + inputEvents.add(new StreamRecord<>(c, 1)); + inputEvents.add(new StreamRecord<>(a1, 2)); + inputEvents.add(new StreamRecord<>(b1, 3)); + inputEvents.add(new StreamRecord<>(a2, 4)); + inputEvents.add(new StreamRecord<>(b2, 5)); + inputEvents.add(new StreamRecord<>(d, 6)); + + Pattern<Event, ?> pattern = Pattern.begin("start").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("c"); + } + }).followedBy(Pattern.begin("middle1").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("a"); + } + }).followedBy("middle2").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("b"); + } + })).times(2).followedBy("end").where(new SimpleCondition() { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("d"); + } + }); + + NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); + + final List<List> resultingPatterns = feedNFA(inputEvents, nfa); + + compareMaps(resultingPatterns, Lists.<List>newArrayList( + Lists.newArrayList(c, a1, b1,
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125176384 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java --- @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.cep.Event; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.GroupPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps; +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA; + +/** + * IT tests covering {@link GroupPattern}. + */ +@SuppressWarnings("unchecked") +public class GroupITCase extends TestLogger { + + @Test + public void testGroupFollowedBy() { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173912 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java --- @@ -430,6 +431,54 @@ public Quantifier getQuantifier() { return this; } + /** +* Starts a new pattern sequence. The provided pattern is the initial pattern +* of the new sequence. +* +* @param group the pattern to begin with +* @return the first pattern of a pattern sequence +*/ + public static <T, F extends T> GroupPattern<T, F> begin(Pattern<T, F> group) { + return new GroupPattern<>(null, group); + } + + /** +* Appends a new pattern to the existing one. The new pattern enforces non-strict --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173877 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java --- @@ -92,6 +92,57 @@ public boolean filter(Event value) throws Exception { } @Test + public void testTimesRangeFromZero() { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173876 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java --- @@ -153,9 +153,8 @@ public int hashCode() { private final int to; private Times(int from, int to) { - Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0."); + Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0."); --- End diff -- done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173871 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java --- @@ -55,6 +55,11 @@ public void checkNameUniqueness(String name) { if (usedNames.contains(name)) { throw new MalformedPatternException("Duplicate pattern name: " + name + ". Names must be unique."); } + usedNames.add(name); + } + --- End diff -- updated --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173832 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State loopingState) { } /** +* Create all the states for the group pattern. +* +* @param groupPattern the group pattern to create the states for +* @param sinkState the state that the group pattern being converted should point to +* @param proceedState the state that the group pattern being converted should proceed to +* @param isOptional whether the group pattern being converted is optional +* @return the first state of the states of the group pattern +*/ + private State createGroupPatternState( + final GroupPattern<T, ?> groupPattern, + final State sinkState, + final State proceedState, + final boolean isOptional) { + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + + Pattern<T, ?> oldCurrentPattern = currentPattern; + Pattern<T, ?> oldFollowingPattern = followingPattern; + GroupPattern<T, ?> oldGroupPattern = currentGroupPattern; + try { + State lastSink = sinkState; + currentGroupPattern = groupPattern; + currentPattern = groupPattern.getRawPattern(); + lastSink = createMiddleStates(lastSink); + lastSink = convertPattern(lastSink); + if (isOptional) { + // for the first state of a group pattern, its PROCEED edge should point to + // the following state of that group pattern + lastSink.addProceed(proceedState, trueFunction); + } + return lastSink; + } finally { + currentPattern = oldCurrentPattern; + followingPattern = oldFollowingPattern; + currentGroupPattern = oldGroupPattern; + } + } + + /** +* Create the states for the group pattern as a looping one. +* +* @param groupPattern the group pattern to create the states for +* @param sinkState the state that the group pattern being converted should point to +* @return the first state of the states of the group pattern +*/ + private State createLoopingGroupPatternState( + final GroupPattern<T, ?> groupPattern, + final State sinkState) { + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + + Pattern<T, ?> oldCurrentPattern = currentPattern; + Pattern<T, ?> oldFollowingPattern = followingPattern; + GroupPattern<T, ?> oldGroupPattern = currentGroupPattern; + try { --- End diff -- updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173828 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State loopingState) { } /** +* Create all the states for the group pattern. +* +* @param groupPattern the group pattern to create the states for +* @param sinkState the state that the group pattern being converted should point to +* @param proceedState the state that the group pattern being converted should proceed to +* @param isOptional whether the group pattern being converted is optional +* @return the first state of the states of the group pattern +*/ + private State createGroupPatternState( + final GroupPattern<T, ?> groupPattern, + final State sinkState, + final State proceedState, + final boolean isOptional) { + final IterativeCondition trueFunction = BooleanConditions.trueFunction(); + + Pattern<T, ?> oldCurrentPattern = currentPattern; + Pattern<T, ?> oldFollowingPattern = followingPattern; + GroupPattern<T, ?> oldGroupPattern = currentGroupPattern; + try { --- End diff -- updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4153#discussion_r125173760 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java --- @@ -366,8 +366,9 @@ public Quantifier getQuantifier() { checkIfNoNotPattern(); checkIfQuantifierApplied(); this.quantifier = Quantifier.times(quantifier.getConsumingStrategy()); - if (from == 0) { --- End diff -- Thanks for the suggestion, created PR: https://github.com/apache/flink/pull/4242 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---