[GitHub] flink issue #4632: [FLINK-7563] [cep] Fix watermark semantics in cep and rel...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4632 I added the test case to the `CEP Operator` and addressed the line. Thanks so much. ---
[GitHub] flink issue #4673: [hotfix] [cep] Fix afterMatchStrategy parameter missing i...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4673 Hi @dawidwys , Thanks for the tip, that's what I tried to add. Sorry for the missing times(). ---
[GitHub] flink issue #4673: [hotfix] [cep] Fix afterMatchStrategy parameter missing i...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4673 Hi @dawidwys , I just found this issue in after match feature, which causes AfterMatchSkipStrategy not work. This may be introduced while merging file conflicts. I addressed that in this PR and added a simple test to exam such problems. Please merge this to the repository. Thanks. ---
[GitHub] flink pull request #4673: [hotfix] [cep] Fix afterMatchStrategy parameter mi...
GitHub user yestinchen opened a pull request: https://github.com/apache/flink/pull/4673 [hotfix] [cep] Fix afterMatchStrategy parameter missing issue ## What is the purpose of the change Fix afterMatchSkipStrategy parameter missing when calling `nfa.process()` function. This issue may be introduced during merging conflicts. ## Brief change log ## Verifying this change This change added tests and can be verified as follows: append a aftermatch test case in `CEPITCase` to exam this kind of problems. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yestinchen/flink aftermatch-hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4673.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4673 commit 060dcde4bd9662c98ade061c1698fcff5bb702b3 Author: Yestin <873915...@qq.com> Date: 2017-09-14T16:01:53Z [hotfix] [cep] Fix afterMatchStrategy parameter missing issue This issue may be introduced during merging conflicts. ---
[GitHub] flink pull request #4632: [FLINK-7563] [cep] Fix watermark semantics in cep ...
GitHub user yestinchen opened a pull request: https://github.com/apache/flink/pull/4632 [FLINK-7563] [cep] Fix watermark semantics in cep and related tests. ## What is the purpose of the change Correct the watermark semantics in cep. ## Brief change log - use the logic that (timestamp <= watermark) is considered late. ## Verifying this change *(Please pick either of the following options)* This change is already covered by existing tests, such as `testCEPOperatorCleanupEventTime()`, and `testCEPOperatorSerializationWRocksDB()` in `CEPOperatorTest.java` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yestinchen/flink FLINK-7563 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4632.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4632 commit 419921c81be20b0c00dfeaba287f603af2600fad Author: Yestin <873915...@qq.com> Date: 2017-08-31T21:09:30Z [FLINK-7563] [cep] Fix watermark semantics in cep and related tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 @dawidwys Thanks for the reviews and comments, please change the documentation during merge. And @kl0u , thanks for the reviews! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r136169035 --- Diff: docs/dev/libs/cep.md --- @@ -1250,6 +1250,104 @@ pattern.within(Time.seconds(10)) +### After Match Skip Strategy + +For a given pattern, there can be many successful matches as data stream flows. In order to control how to restart the match process after a successful match, we need to specify the skip strategy called `AfterMatchSkipStrategy`. There're four types of skip strategies, listed as follows: --- End diff -- That sounds good to me, please change it during merge. Thanks a lot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 Hi @dawidwys , sorry for the late response. Thanks for your reviews, I have updated the test and the document. Please take a look if you have time. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r133478539 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(Queue<ComputationState> computationStates, --- End diff -- You are absolutely right. Thanks for the tip. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r133478273 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,29 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ --- End diff -- We only need to check the skip strategy before compile the `Pattern` to `NFA`, I think it's more reasonable to place it here. Also, we need to check whether the `patternName` field in the `AfterMatchSkipStrategy` is a valid reference, which can not be done easily in `Pattern` class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r133091608 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(Queue<ComputationState> computationStates, + Collection<Map<String, List>> matchedResult, AfterMatchSkipStrategy afterMatchSkipStrategy) { + Set discardEvents = new HashSet<>(); + switch(afterMatchSkipStrategy.getStrategy()) { + case SKIP_TO_LAST: + for (Map<String, List> resultMap: matchedResult) { + for (Map.Entry<String, List> keyMatches : resultMap.entrySet()) { + if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { + discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1)); + break; + } else { + discardEvents.addAll(keyMatches.getValue()); + } + } + } + break; + case SKIP_TO_FIRST: + for (Map<String, List> resultMap: matchedResult) { + for (Map.Entry<String, List> keyMatches : resultMap.entrySet()) { + if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { + break; + } else { + discardEvents.addAll(keyMatches.getValue()); + } + } + } + break; + case SKIP_PAST_LAST_EVENT: + for (Map<String, List> resultMap: matchedResult) { + for (List eventList: resultMap.values()) { + discardEvents.addAll(eventList); + } + } + break; + } + if (!discardEvents.isEmpty()) { + List<ComputationState> discardStates = new ArrayList<>(); + for (ComputationState computationState : computationStates) { + Map<String, List> partialMatch = extractCurrentMatches(computationState); + for (List list: partialMatch.values()) { + for (T e: list) { + if (discardEvents.contains(e)) { + // discard the computation state. + eventSharedBuffer.release( + NFAStateNameHandler.getOriginalNameFromInternal( + computationState.getState().getName()), + computationState.getEvent(), + computationState.getTimestamp(), + computationState.getCounter() + ); + discardStates.add(computationState); --- End diff -- Yes, you are right. Thanks for pointing it out! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 Thanks for your reviews @dawidwys ! I'll update the doc in the following commits. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132386948 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,59 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy(); + if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST || + afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) { + Pattern<T, ?> pattern = currentPattern; + while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { + pattern = pattern.getPrevious(); + } + // pattern name match check. + if (pattern == null) { + throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " + + "can not be found in the given Pattern"); + } else { + // can not be used with optional states. + if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { + throw new MalformedPatternException("the AfterMatchSkipStrategy " + + afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern"); + } + } + + // start position check. + if (pattern.getPrevious() == null) { --- End diff -- Great, I'll just remove all those optional state check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132349108 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.List; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_EVENT, + * SKIP_TO_NEXT_EVENT, + * SKIP_TO_FIRST_PATTERN and + * SKIP_TO_LAST_PATTERN + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT; + + // fields + String patternName = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_EVENT, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) { --- End diff -- Good idea, I like that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132348950 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -150,6 +160,59 @@ long getWindowTime() { } /** +* Check pattern after match skip strategy. +*/ + private void checkPatternSkipStrategy() { + AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy(); + if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST || + afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) { + Pattern<T, ?> pattern = currentPattern; + while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) { + pattern = pattern.getPrevious(); + } + // pattern name match check. + if (pattern == null) { + throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " + + "can not be found in the given Pattern"); + } else { + // can not be used with optional states. + if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) { + throw new MalformedPatternException("the AfterMatchSkipStrategy " + + afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern"); + } + } + + // start position check. + if (pattern.getPrevious() == null) { --- End diff -- I agree with you that the fallback approach is much easier to understand and maintain. If we discard nothing, the actual sematics is to use SKIP_TO_NEXT_EVENT for the next match process. But it will have an impact on matching sematics, which may lead to incorrect results. I think users should be aware of what happens. My original thought was to add a configuration switch, to let user choose between throwing exceptions and falling back to a default skip strategy. Do you have any ideas about that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4479: [FLINK-7309][hotfix] fix NullPointerException when...
GitHub user yestinchen opened a pull request: https://github.com/apache/flink/pull/4479 [FLINK-7309][hotfix] fix NullPointerException when selecting null fields ## What is the purpose of the change This pull request addresses FLINK-7309, adds null check before applying unboxing on input fields. ## Brief change log - Add null check before applying unboxing on input fields. ## Verifying this change This change added tests and can be verified as follows: - Added test case: select null field from a Timestamp type field. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yestinchen/flink FLINK-7309 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4479 commit 7cbbd13b723df11e72ccb115b5266104b0b01183 Author: Yestin <873915...@qq.com> Date: 2017-08-04T09:21:03Z [FLINK-7309][hotfix] fix NullPointerException when selecting null fields. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 @dawidwys @dianfu I've updated the approach according to the document. Feel free to comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 @dianfu Thanks for your reviewing. I found @dawidwys wrote a draft about the JIRA's implementation. I'll go through that first and address those issues in this PR latter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130317411 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, event); + if (callLevel > 0) { --- End diff -- Because we need to detect whether there is an infinite loop. I use the callLevel to track it here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130317270 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { --- End diff -- Thanks for pointing it out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r130317122 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_EVENT: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, event)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getPatternName()) && --- End diff -- Yes, You are right. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...
Github user yestinchen commented on the issue: https://github.com/apache/flink/pull/4331 @dawidwys Thanks for the reviewing. Problem 1 is easy to fix, we can just start a new match process if the only left computation state reaches stopState. Problem 2 can not be avoided with current approach. It's impossible to know whether there are potential matches. I think the best wary to implement this correctly is try to start a new match process after processing each event, and discard unfinished match process after a successful match according to the skip strategy. In order to do that, we need to keep the logical order of the events, which is the original idea I proposed. As for your general notes, I have some ideas: 1. I agree that the Oracle's specification is designed for bounded data. But match recoginize in unbounded data is very similar to bounded data, since all data are being processed one by one, and there's no need for bound information. As for **_empty match_** , I think we can just use Oracle's definition. > Some patterns permit empty matches. For example: PATTERN (A*) can be matched by zero or more rows that are mapped to A. An empty match does not map any rows to primary row pattern variables; nevertheless, an empty match has a starting row. For example, there can be an empty match at the first row of a row pattern partition, an empty match at the second row of a row pattern partition, etc. An empty match is assigned a sequential match number, based on the ordinal position of its starting row, the same as any other match. 2. I feel uncomfortable with the RuntimeExceptions too. But these exceptions are very important to keep the skip semantics right. I understand your main concern is that Exceptions will stop the matching process, which is unacceptable to online streaming service. To address this, I think we can introduce a default strategy(SKIP_TO_NEXT_EVENT, for example). If these exceptions happens, we can use default strategy to continue the match process, and change the strategy back after a successful match. We can also add a switch to let user decide whether to enable this feature. 3. I still think it's useful to support these skip strategies. Don't know why Esper does not support them. 4. Thanks for the related information. I took a brief look at the PR, which is very similar to this PR. I wonder why it is closed without merging into the master code? Looking forward to your feedbacks. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129518272 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_ROW: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getRpv()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, outgoingEdges); --- End diff -- Now I keep `startComputationState` instead of `startState` in NFA, so it can calculate the `outgoingEdges` from the start state when needed. Is this right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129517227 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, + * SKIP_TO_FIRST_RPV and + * SKIP_TO_LAST_RPV + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW; + + // fields + String rpv = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_ROW, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) { + if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) { + if (rpv == null) { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129517254 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, + * SKIP_TO_FIRST_RPV and + * SKIP_TO_LAST_RPV + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW; + + // fields + String rpv = null; + + public AfterMatchSkipStrategy(){ + this(SkipStrategy.SKIP_TO_NEXT_ROW, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy) { + this(strategy, null); + } + + public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) { + if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) { + if (rpv == null) { + throw new IllegalArgumentException("the rpv field can not be empty when SkipStrategy is " + strategy); + } + } + this.strategy = strategy; + this.rpv = rpv; + } + + public SkipStrategy getStrategy() { + return strategy; + } + + public String getRpv() { + return rpv; + } + + @Override + public String toString() { + return "AfterMatchStrategy{" + + "strategy=" + strategy + + ", rpv=" + rpv + + '}'; + } + + /** +* Skip Strategy Enum. +*/ + public enum SkipStrategy{ + SKIP_TO_NEXT_ROW, + SKIP_PAST_LAST_ROW, + SKIP_TO_FIRST, + SKIP_TO_LAST + } + + /** +* The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. +*/ + public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** +* This empty constructor is required for deserializing the configuration. +*/ + public AfterMatchSkipStrategyConfigSnapshot() { + } + + public AfterMatchSkipStrategyConfigSnapshot( + TypeSerializer enumSerializer, + TypeSerializer stringSerializer) { + + super(enumSerializer, stringSerializer); + } + + @Override + public int getVersion() { + return VERSION; + } + } + + /** +* A {@link TypeSerializer} for the {@link AfterMatchSkipStrategy}. +*/ + public static cl
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129517161 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, + * SKIP_TO_FIRST_RPV and + * SKIP_TO_LAST_RPV + * + */ +public class AfterMatchSkipStrategy implements Serializable { + + // default strategy + SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW; + + // fields + String rpv = null; --- End diff -- It means Row Pattern Variable. I already changed it to `patternName`, thought it would be better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129516786 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + + +/** + * Indicate the skip strategy after a match process. + * There're four kinds of strategies: + * SKIP_PAST_LAST_ROW, + * SKIP_TO_NEXT_ROW, --- End diff -- I changed the `ROW` to `EVENT`, is it better ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129516650 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java --- @@ -36,6 +40,14 @@ * @return Resulting pattern stream */ public static PatternStream pattern(DataStream input, Pattern<T, ?> pattern) { - return new PatternStream<>(input, pattern); + return new PatternStream<>(input, pattern, skipStrategy); + } + + /** +* Set the pattern's skip strategy after match. +* @param afterMatchSkipStrategy the skip strategy to use. +*/ + public static void setAfterMatchSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy) { --- End diff -- Changed that into `Pattern` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user yestinchen commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r129515022 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition edge) { nextVersion, startTimestamp); } + + switch (skipStrategy.getStrategy()) { + case SKIP_PAST_LAST_ROW: + if (nextState.isFinal()) { + resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges)); + } + break; + case SKIP_TO_FIRST: + if (nextState.getName().equals(skipStrategy.getRpv()) && + !nextState.getName().equals(currentState.getName())) { + ComputationState startComputationState = createStartComputationState(computationState, outgoingEdges); + if (callLevel > 0) { + throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query"); + } + // feed current matched event to the state. + Collection<ComputationState> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++); --- End diff -- Because SKIP_TO_FIRST or SKIP_TO_LAST needs to start the next match process at the first or last matched event in specified pattern. For example, for a given event stream: `a1, b1, c1, a2` and a given match `(A B C)`. If we set the SkipStrategy to SKIP_TO_FIRST with a pattern name `B`, we should create a new `startComputationState` after `b1` is being processed. And the next match should start at event `b1`. So we need to manually feed `b1` to the newly created `startComputationState`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
GitHub user yestinchen opened a pull request: https://github.com/apache/flink/pull/4331 [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/yestinchen/flink FLINK-7169 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4331.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4331 commit 28f2e0ab2b6fd38864017fc64d2c76a65c8f7574 Author: Yestin <873915...@qq.com> Date: 2017-07-14T08:41:51Z [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---