[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701924#comment-16701924 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys closed pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala index 33d24de7748..91a15032a91 100644 --- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala @@ -113,7 +113,7 @@ class PatternTest { assertTrue(pattern.getCondition.isDefined) assertTrue(previous.getCondition.isDefined) -assertFalse(preprevious.getCondition.isDefined) +assertTrue(preprevious.getCondition.isDefined) assertEquals(pattern.getName, "end") assertEquals(previous.getName, "next") diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java new file mode 100644 index 000..ff6e610845c --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { + + private final RuntimeContext runtimeContext; + + public CepRuntimeContext(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public String getTaskName() { + return runtimeContext.getTaskName(); + } + + @Override + public MetricGroup getMetricGroup() { + return runtimeContext.getMetricGroup(); + } + + @Override + public int getNumberOfParallelSubtasks() { + return
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700069#comment-16700069 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on issue #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#issuecomment-441975719 @dawidwys Thanks a lot for your latest review comments and have updated the PR accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699088#comment-16699088 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys edited a comment on issue #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#issuecomment-441666754 Hi @dianfu, Just a few comments, before we are ready to go with this PR: 1. Let's deprecate `OrCondition`, `AndCondition`, `NotCondition` with a comment that they exist only for backwards compatibility and advice to use the rich versions. (Agreed, let's do this in a separate PR/issue: https://issues.apache.org/jira/browse/FLINK-10113) 2. Mark all the conditions in `conditions` package `@Internal` 3. I would be in favor of unifying the null handling in the new classes in a way that we do not allow nulls there. We can change the `getCondition` method in `Pattern` to return `BooleanConditions#trueFunction` if nothing was set. This will make the whole logic more explicit IMHO. Let's leave the old classes as they are, as we want to drop them anyway. 4. Use the new rich conditions in `Pattern#where` and `Pattern#or`. Add a test that rich interfaces work for those methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699073#comment-16699073 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r236285885 ## File path: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java ## @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.cep.pattern.conditions.RichAndCondition; +import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichNotCondition; +import org.apache.flink.cep.pattern.conditions.RichOrCondition; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test cases for {@link CepRuntimeContext}. + */ +public class CepRuntimeContextTest { + + @Test + public void testRichPatternSelectFunction() throws Exception { + RichPatternSelectFunction function = new RichPatternSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Integer select(Map> pattern) throws Exception { + return null; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichPatternFlatSelectFunction() throws Exception { + RichPatternFlatSelectFunction function = + new RichPatternFlatSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void flatSelect(Map> pattern, Collector out) throws Exception { + // no op + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichIterativeConditionFunction() throws Exception { + RichIterativeCondition function = new RichIterativeCondition() { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter( + Integer value, Context ctx) throws Exception { + return false; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichCompositeIterativeCondition() throws
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699074#comment-16699074 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r236286156 ## File path: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java ## @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.cep.pattern.conditions.RichAndCondition; +import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichNotCondition; +import org.apache.flink.cep.pattern.conditions.RichOrCondition; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test cases for {@link CepRuntimeContext}. + */ +public class CepRuntimeContextTest { + + @Test + public void testRichPatternSelectFunction() throws Exception { + RichPatternSelectFunction function = new RichPatternSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Integer select(Map> pattern) throws Exception { + return null; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichPatternFlatSelectFunction() throws Exception { + RichPatternFlatSelectFunction function = + new RichPatternFlatSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void flatSelect(Map> pattern, Collector out) throws Exception { + // no op + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichIterativeConditionFunction() throws Exception { + RichIterativeCondition function = new RichIterativeCondition() { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter( + Integer value, Context ctx) throws Exception { + return false; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichCompositeIterativeCondition() throws
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699090#comment-16699090 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys edited a comment on issue #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#issuecomment-441666754 Hi @dianfu, Just a few comments, before we are ready to go with this PR: 1. Let's deprecate `OrCondition`, `AndCondition`, `NotCondition` with a comment that they exist only for backwards compatibility and advice to use the rich versions. (Agreed, let's do this in a separate PR/issue: https://issues.apache.org/jira/browse/FLINK-10113) 2. Mark all the conditions in `conditions` package `@Internal` 3. I would be in favor of unifying the null handling in the new classes in a way that we do not allow nulls there. We can change the `getCondition` method in `Pattern` to return `BooleanConditions#trueFunction` if nothing was set. This will make the whole logic more explicit IMHO. Let's leave the old classes as they are, as we want to drop them anyway. 4. Use the new rich conditions in `Pattern#where` and `Pattern#or`. Add a test that checks if rich interfaces work for those methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699086#comment-16699086 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys edited a comment on issue #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#issuecomment-441666754 Hi @dianfu, Just a few comments, before we are ready to go with this PR: 1. Let's deprecate `OrCondition`, `AndCondition`, `NotCondition` with a comment that they exist only for backwards compatibility and advice to use the rich versions. (Agreed, let's do this in a separate PR/issue: https://issues.apache.org/jira/browse/FLINK-10113) 2. Mark all the conditions in `conditions` package `@Internal` 3. I would be in favor of unifying the null handling in the new classes in a way that we do not allow nulls there. We can change the `getCondition` method in `Pattern` to return `BooleanConditions#trueFunction` if nothing was set. This will make the whole logic more explicit IMHO. Let's leave the old classes as they are, as we want to drop them anyway. 4. Use the new rich conditions in `Pattern#where` and `Pattern#or` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699085#comment-16699085 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r236287295 ## File path: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java ## @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.cep.pattern.conditions.RichAndCondition; +import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichNotCondition; +import org.apache.flink.cep.pattern.conditions.RichOrCondition; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test cases for {@link CepRuntimeContext}. + */ +public class CepRuntimeContextTest { + + @Test + public void testRichPatternSelectFunction() throws Exception { + RichPatternSelectFunction function = new RichPatternSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Integer select(Map> pattern) throws Exception { + return null; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichPatternFlatSelectFunction() throws Exception { + RichPatternFlatSelectFunction function = + new RichPatternFlatSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void flatSelect(Map> pattern, Collector out) throws Exception { + // no op + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichIterativeConditionFunction() throws Exception { + RichIterativeCondition function = new RichIterativeCondition() { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter( + Integer value, Context ctx) throws Exception { + return false; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichCompositeIterativeCondition() throws
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699084#comment-16699084 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r236287295 ## File path: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java ## @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.cep.pattern.conditions.RichAndCondition; +import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichNotCondition; +import org.apache.flink.cep.pattern.conditions.RichOrCondition; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test cases for {@link CepRuntimeContext}. + */ +public class CepRuntimeContextTest { + + @Test + public void testRichPatternSelectFunction() throws Exception { + RichPatternSelectFunction function = new RichPatternSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Integer select(Map> pattern) throws Exception { + return null; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichPatternFlatSelectFunction() throws Exception { + RichPatternFlatSelectFunction function = + new RichPatternFlatSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void flatSelect(Map> pattern, Collector out) throws Exception { + // no op + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichIterativeConditionFunction() throws Exception { + RichIterativeCondition function = new RichIterativeCondition() { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter( + Integer value, Context ctx) throws Exception { + return false; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichCompositeIterativeCondition() throws
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699081#comment-16699081 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r236286828 ## File path: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java ## @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.cep.pattern.conditions.RichAndCondition; +import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichIterativeCondition; +import org.apache.flink.cep.pattern.conditions.RichNotCondition; +import org.apache.flink.cep.pattern.conditions.RichOrCondition; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test cases for {@link CepRuntimeContext}. + */ +public class CepRuntimeContextTest { + + @Test + public void testRichPatternSelectFunction() throws Exception { + RichPatternSelectFunction function = new RichPatternSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Integer select(Map> pattern) throws Exception { + return null; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichPatternFlatSelectFunction() throws Exception { + RichPatternFlatSelectFunction function = + new RichPatternFlatSelectFunction() { + private static final long serialVersionUID = 1L; + + @Override + public void flatSelect(Map> pattern, Collector out) throws Exception { + // no op + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichIterativeConditionFunction() throws Exception { + RichIterativeCondition function = new RichIterativeCondition() { + private static final long serialVersionUID = 1L; + + @Override + public boolean filter( + Integer value, Context ctx) throws Exception { + return false; + } + }; + + verifyRuntimeContext(function); + } + + @Test + public void testRichCompositeIterativeCondition() throws
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699069#comment-16699069 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on issue #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#issuecomment-441666754 Hi @dianfu, Just a few comments, before we are ready to go with this PR: 1. Let's deprecate `OrCondition`, `AndCondition`, `NotCondition` with a comment that they exist only for backwards compatibility and advice to use the rich versions. (Agreed, let's do this in a separate PR/issue: https://issues.apache.org/jira/browse/FLINK-10113) 2. Mark all the conditions in `conditions` package `@Internal` 3. I would be in favor of unifying the null handling in the new classes in a way that we do not allow nulls there. We can change the `getCondition` method in `Pattern` to return `BooleanConditions#trueFunction` if nothing was set. This will make the whole logic more explicit IMHO. Let's leave the old classes as they are, as we want to drop them anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698923#comment-16698923 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r236253748 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return (left == null || left.filter(value, ctx)) && (right == null || right.filter(value, ctx)); Review comment: By `condition() == null` I meant actually evaluating the condition, whereas `condition == null` means that a condition was provided at all. In other words, what I wanted to say here is that the mentioned three state logic does not apply here at all. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698921#comment-16698921 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r236253748 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return (left == null || left.filter(value, ctx)) && (right == null || right.filter(value, ctx)); Review comment: By `condition() == null` I meant actually evaluating the condition, whereas `condition == null` means that a condition was provided at all. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698361#comment-16698361 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on issue #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#issuecomment-441486964 @dianfu Thanks for the update! LGTM. +1 to merged This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16695574#comment-16695574 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235608590 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return (left == null || left.filter(value, ctx)) && (right == null || right.filter(value, ctx)); Review comment: @dawidwys Thank you for the reminder!Can you explain more about `condition() == null`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694422#comment-16694422 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235301974 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return (left == null || left.filter(value, ctx)) && (right == null || right.filter(value, ctx)); Review comment: @sunjincheng121 Note that `condition == null` is differente from `condition() == null`. It is for cases when condition is not specified at all. This is in line with the `MATCH_RECOGNIZE` standard that if a condition for a pattern variable is not specified then all rows will be matched against that variable. Also just a comment we do not have to always align to SQL standard in CEP library, as long as we can maintain the compatibility in the SQL -> CEP translation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694297#comment-16694297 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235268470 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return (left == null || left.filter(value, ctx)) && (right == null || right.filter(value, ctx)); Review comment: @dianfu @dawidwys In a traditional database, such as ORACLE the CONDITION has three states TRUE, FALSE and UNKNOW. The key points is how we handle UNKNOW in Flink. [Please see detail](https://docs.oracle.com/cd/E17904_01/apirefs./e12048/conditions.htm#CQLLR281) At current time in the `checkFilterCondition()` of `NFA` aways true if `IterativeCondition` is `null`. ![image](https://user-images.githubusercontent.com/22488084/48823207-e951d180-ed9a-11e8-904a-434ff3bb9d60.png) So we can refer to existing behaviors, and I think you are right @dianfu . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694243#comment-16694243 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235254991 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code OR} and returns {@code true} if at least one is {@code true}. + * + * @param Type of the element to filter + */ +public class RichOrCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichOrCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return left == null || right == null || left.filter(value, ctx) || right.filter(value, ctx); + } Review comment: Same as above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694242#comment-16694242 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235254880 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return (left == null || left.filter(value, ctx)) && (right == null || right.filter(value, ctx)); Review comment: I think the implementation depends on the semantics of null condition. Per my understanding, the null condition means that the condition always holds no matter what the input element is. @dawidwys what's your thought? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694195#comment-16694195 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235247291 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return (left == null || left.filter(value, ctx)) && (right == null || right.filter(value, ctx)); Review comment: left != null && right != null && left.filter(value, ctx) && right.filter(value, ctx) ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694194#comment-16694194 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235247496 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code OR} and returns {@code true} if at least one is {@code true}. + * + * @param Type of the element to filter + */ +public class RichOrCondition extends RichCompositeIterativeCondition { + + private static final long serialVersionUID = 1L; + + public RichOrCondition(final IterativeCondition left, final IterativeCondition right) { + super(left, right); + } + + @Override + public boolean filter(T value, Context ctx) throws Exception { + IterativeCondition left = getLeft(); + IterativeCondition right = getRight(); + return left == null || right == null || left.filter(value, ctx) || right.filter(value, ctx); + } Review comment: (left != null && left.filter(value, ctx) ) || (right != null && right.filter(value, ctx)) ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694186#comment-16694186 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235246730 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; + +/** + * A {@link RichIterativeCondition condition} which negates the condition it wraps + * and returns {@code true} if the original condition returns {@code false}. + * + * @param Type of the element to filter + */ +public class RichNotCondition extends RichIterativeCondition { + + private static final long serialVersionUID = 1L; + + private final IterativeCondition original; + + public RichNotCondition(final IterativeCondition original) { + this.original = original; Review comment: Sounds good! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694172#comment-16694172 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235244088 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { + + private final RuntimeContext runtimeContext; + + public CepRuntimeContext(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public String getTaskName() { + return runtimeContext.getTaskName(); + } + + @Override + public MetricGroup getMetricGroup() { + return runtimeContext.getMetricGroup(); + } + + @Override + public int getNumberOfParallelSubtasks() { + return runtimeContext.getNumberOfParallelSubtasks(); + } + + @Override + public int getMaxNumberOfParallelSubtasks() { + return runtimeContext.getMaxNumberOfParallelSubtasks(); + } + + @Override + public int getIndexOfThisSubtask() { + return runtimeContext.getIndexOfThisSubtask(); + } + + @Override + public int getAttemptNumber() { + return runtimeContext.getAttemptNumber(); + } + + @Override + public String getTaskNameWithSubtasks() { + return runtimeContext.getTaskNameWithSubtasks(); + } + + @Override + public ExecutionConfig getExecutionConfig() { + return runtimeContext.getExecutionConfig(); + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return runtimeContext.getUserCodeClassLoader(); + } + + // --- + // Unsupported operations + // --- + + @Override + public void addAccumulator( + String name, Accumulator accumulator) { +
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694167#comment-16694167 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235243736 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Map; + +/** + * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param Type of the input elements + * @param Type of the output element + */ +public abstract class RichPatternFlatSelectFunction Review comment: Make sense. Have added some test cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694165#comment-16694165 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235243602 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichIterativeCondition { Review comment: I think we need to check the null condition. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694162#comment-16694162 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235243458 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; + +/** + * A {@link RichIterativeCondition condition} which negates the condition it wraps + * and returns {@code true} if the original condition returns {@code false}. + * + * @param Type of the element to filter + */ +public class RichNotCondition extends RichIterativeCondition { + + private static final long serialVersionUID = 1L; + + private final IterativeCondition original; + + public RichNotCondition(final IterativeCondition original) { + this.original = original; Review comment: +1 to unify the behavior. Have updated the PR. I mention the test case NFAITCase#testNoConditionNFA is just to explain that we need to consider null. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694163#comment-16694163 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r235243458 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; + +/** + * A {@link RichIterativeCondition condition} which negates the condition it wraps + * and returns {@code true} if the original condition returns {@code false}. + * + * @param Type of the element to filter + */ +public class RichNotCondition extends RichIterativeCondition { + + private static final long serialVersionUID = 1L; + + private final IterativeCondition original; + + public RichNotCondition(final IterativeCondition original) { + this.original = original; Review comment: +1 to unify the behavior. Have updated the PR. I mention the test case NFAITCase#testNoConditionNFA is just to explain that we need to consider null. :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692974#comment-16692974 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234932717 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { Review comment: CepRuntimeContext is also needed by RichIterativeCondition which is in another package which preventing us to do so. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692965#comment-16692965 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu edited a comment on issue #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#issuecomment-440209346 @dawidwys @sunjincheng121 Thanks a lot for your review comments. Have updated the PR. > I would like to postpone it a little bit though until we figure out if we can drop support for deserializing conditions. If so I think we should drop the non-rich versions of conditions like And/Or/Not etc. This means that we will drop the migration support of jobs <= 1.5. Maybe we can create another ticket to track that as it also includes other changes such as removing NFASerializer etc. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692964#comment-16692964 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on issue #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#issuecomment-440209346 @dawidwys @sunjincheng121 Thanks a lot for your review comments. Have updated the PR. > I would like to postpone it a little bit though until we figure out if we can drop support for deserializing conditions. If so I think we should drop the non-rich versions of conditions like And/Or/Not etc. This means that we will drop the migration support of jobs <= 1.5. Maybe we can create another ticket to track that as it also includes other changes such as removing NFASerializer etc. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692870#comment-16692870 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234913483 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { + + private final RuntimeContext runtimeContext; + + public CepRuntimeContext(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public String getTaskName() { + return runtimeContext.getTaskName(); + } + + @Override + public MetricGroup getMetricGroup() { + return runtimeContext.getMetricGroup(); + } + + @Override + public int getNumberOfParallelSubtasks() { + return runtimeContext.getNumberOfParallelSubtasks(); + } + + @Override + public int getMaxNumberOfParallelSubtasks() { + return runtimeContext.getMaxNumberOfParallelSubtasks(); + } + + @Override + public int getIndexOfThisSubtask() { + return runtimeContext.getIndexOfThisSubtask(); + } + + @Override + public int getAttemptNumber() { + return runtimeContext.getAttemptNumber(); + } + + @Override + public String getTaskNameWithSubtasks() { + return runtimeContext.getTaskNameWithSubtasks(); + } + + @Override + public ExecutionConfig getExecutionConfig() { + return runtimeContext.getExecutionConfig(); + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return runtimeContext.getUserCodeClassLoader(); + } + + // --- + // Unsupported operations + // --- + + @Override + public void addAccumulator( + String name, Accumulator accumulator)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692860#comment-16692860 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234910601 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; + +/** + * A {@link RichIterativeCondition condition} which negates the condition it wraps + * and returns {@code true} if the original condition returns {@code false}. + * + * @param Type of the element to filter + */ +public class RichNotCondition extends RichIterativeCondition { + + private static final long serialVersionUID = 1L; + + private final IterativeCondition original; + + public RichNotCondition(final IterativeCondition original) { + this.original = original; Review comment: Yes, I see that case, but why can't we let `RichAnd` and `RichOr` have default handling for null? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692854#comment-16692854 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234909395 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichIterativeCondition { Review comment: Variable parameters are a good idea! Does `RichNot ` need check null of Condition ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692833#comment-16692833 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234906587 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { + + private final RuntimeContext runtimeContext; + + public CepRuntimeContext(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public String getTaskName() { + return runtimeContext.getTaskName(); + } + + @Override + public MetricGroup getMetricGroup() { + return runtimeContext.getMetricGroup(); + } + + @Override + public int getNumberOfParallelSubtasks() { + return runtimeContext.getNumberOfParallelSubtasks(); + } + + @Override + public int getMaxNumberOfParallelSubtasks() { + return runtimeContext.getMaxNumberOfParallelSubtasks(); + } + + @Override + public int getIndexOfThisSubtask() { + return runtimeContext.getIndexOfThisSubtask(); + } + + @Override + public int getAttemptNumber() { + return runtimeContext.getAttemptNumber(); + } + + @Override + public String getTaskNameWithSubtasks() { + return runtimeContext.getTaskNameWithSubtasks(); + } + + @Override + public ExecutionConfig getExecutionConfig() { + return runtimeContext.getExecutionConfig(); + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return runtimeContext.getUserCodeClassLoader(); + } + + // --- + // Unsupported operations + // --- + + @Override + public void addAccumulator( + String name, Accumulator accumulator) { +
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692841#comment-16692841 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234907461 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; + +/** + * A {@link RichIterativeCondition condition} which negates the condition it wraps + * and returns {@code true} if the original condition returns {@code false}. + * + * @param Type of the element to filter + */ +public class RichNotCondition extends RichIterativeCondition { + + private static final long serialVersionUID = 1L; + + private final IterativeCondition original; + + public RichNotCondition(final IterativeCondition original) { + this.original = original; Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692842#comment-16692842 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234907686 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { Review comment: Could we make this class package protected? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692840#comment-16692840 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234907432 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Map; + +/** + * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param Type of the input elements + * @param Type of the output element + */ +public abstract class RichPatternFlatSelectFunction Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692839#comment-16692839 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234907384 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { Review comment: This is an internal name in CEP library and it follows the naming convention of other such classes e.g. `RichAsyncFunctionRuntimeContext`, `IterationRuntimeUDFContext` etc. So I would be for using the `RuntimeContext` suffix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692838#comment-16692838 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234907168 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; + +/** + * A {@link RichIterativeCondition condition} which negates the condition it wraps + * and returns {@code true} if the original condition returns {@code false}. + * + * @param Type of the element to filter + */ +public class RichNotCondition extends RichIterativeCondition { + + private static final long serialVersionUID = 1L; + + private final IterativeCondition original; + + public RichNotCondition(final IterativeCondition original) { + this.original = original; Review comment: I think this is by design as CEP support patterns where there are no conditions. You can refer to the test case of NFAITCase#testNoConditionNFA. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692824#comment-16692824 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234904578 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichIterativeCondition { Review comment: I though about sth like this: abstract class RichCompositeCondition extends RichIterativeCondition { private IterativeCondition[] conditions; RichCompositeCondition(IterativeCondition... conditions) { this.conditions = conditions; } @Override public void setRuntimeContext(RuntimeContext runtimeContext) { super.setRuntimeContext(runtimeContext); for (IterativeCondition condition : conditions) { FunctionUtils.setFunctionRuntimeContext(condition, runtimeContext); } } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); for (IterativeCondition condition : conditions) { openFunction(condition, parameters); } } @Override public void close() throws Exception { super.close(); for (IterativeCondition condition : conditions) { closeFunction(condition); } } } public class RichAndCondition extends RichCompositeCondition { private static final long serialVersionUID = 1L; private final IterativeCondition left; private final IterativeCondition right; public RichAndCondition(final IterativeCondition left, final IterativeCondition right) { super(left, right); this.left = Preconditions.checkNotNull(left, "The condition cannot be null."); this.right = Preconditions.checkNotNull(right, "The condition cannot be null."); } @Override public boolean filter(T value, Context ctx) throws Exception { return left.filter(value, ctx) && right.filter(value, ctx); } /** * @return One of the {@link IterativeCondition conditions} combined in this condition. */ public IterativeCondition getLeft() { return left; } /** * @return One of the {@link IterativeCondition conditions} combined in this condition. */ public IterativeCondition getRight() { return right; } } public class RichNotCondition extends RichCompositeCondition { private static final long serialVersionUID = 1L; private final IterativeCondition original; public RichNotCondition(final IterativeCondition original) { super(original); this.original = Preconditions.checkNotNull(original); } @Override public boolean filter(T value, Context ctx) throws Exception { return original != null &&
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692768#comment-16692768 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on issue #5080: [FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper URL: https://github.com/apache/flink/pull/5080#issuecomment-440170762 Close this PR as it is duplicate with #7110 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692764#comment-16692764 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu closed pull request #5080: [FLINK-8159] [cep] Add rich support for SelectWrapper and FlatSelectWrapper URL: https://github.com/apache/flink/pull/5080 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java index 4423bb1dd40..95225271b9e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java @@ -20,7 +20,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.EventComparator; @@ -28,6 +29,7 @@ import org.apache.flink.cep.PatternFlatTimeoutFunction; import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.util.OutputTag; @@ -102,7 +104,7 @@ protected void processTimedOutSequences( * in one udf. */ @Internal - public static class FlatSelectWrapper implements Function { + public static class FlatSelectWrapper extends AbstractRichFunction { private static final long serialVersionUID = -8320546120157150202L; @@ -125,5 +127,25 @@ public FlatSelectWrapper( this.flatSelectFunction = flatSelectFunction; this.flatTimeoutFunction = flatTimeoutFunction; } + + @Override + public void open(Configuration parameters) throws Exception { + if (flatSelectFunction instanceof RichFunction) { + ((RichFunction) flatSelectFunction).open(parameters); + } + if (flatTimeoutFunction instanceof RichFunction) { + ((RichFunction) flatTimeoutFunction).open(parameters); + } + } + + @Override + public void close() throws Exception { + if (flatSelectFunction instanceof RichFunction) { + ((RichFunction) flatSelectFunction).close(); + } + if (flatTimeoutFunction instanceof RichFunction) { + ((RichFunction) flatTimeoutFunction).close(); + } + } } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java index cb233a486ec..18a1454cf4c 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java @@ -19,7 +19,8 @@ package org.apache.flink.cep.operator; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.EventComparator; @@ -27,6 +28,7 @@ import org.apache.flink.cep.PatternTimeoutFunction; import org.apache.flink.cep.nfa.AfterMatchSkipStrategy; import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.OutputTag; @@ -54,8 +56,8 @@ public SelectTimeoutCepOperator( NFACompiler.NFAFactory nfaFactory, final EventComparator comparator, AfterMatchSkipStrategy skipStrategy, -
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692740#comment-16692740 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234878521 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { + + private final RuntimeContext runtimeContext; + + public CepRuntimeContext(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public String getTaskName() { + return runtimeContext.getTaskName(); + } + + @Override + public MetricGroup getMetricGroup() { + return runtimeContext.getMetricGroup(); + } + + @Override + public int getNumberOfParallelSubtasks() { + return runtimeContext.getNumberOfParallelSubtasks(); + } + + @Override + public int getMaxNumberOfParallelSubtasks() { + return runtimeContext.getMaxNumberOfParallelSubtasks(); + } + + @Override + public int getIndexOfThisSubtask() { + return runtimeContext.getIndexOfThisSubtask(); + } + + @Override + public int getAttemptNumber() { + return runtimeContext.getAttemptNumber(); + } + + @Override + public String getTaskNameWithSubtasks() { + return runtimeContext.getTaskNameWithSubtasks(); + } + + @Override + public ExecutionConfig getExecutionConfig() { + return runtimeContext.getExecutionConfig(); + } + + @Override + public ClassLoader getUserCodeClassLoader() { + return runtimeContext.getUserCodeClassLoader(); + } + + // --- + // Unsupported operations + // --- + + @Override + public void addAccumulator( + String name, Accumulator accumulator)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692742#comment-16692742 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234882983 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichIterativeCondition { Review comment: I agree to deal with the open and close methods in a way that adds abstract classes, and RichAnd is very similar to RichOr, but how do we deal with the difference between RichNot and RichAnd/RichOr? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692741#comment-16692741 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234885267 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Map; + +/** + * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param Type of the input elements + * @param Type of the output element + */ +public abstract class RichPatternFlatSelectFunction Review comment: Add the test case in `CEPITCase`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692739#comment-16692739 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234885688 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; + +/** + * A {@link RichIterativeCondition condition} which negates the condition it wraps + * and returns {@code true} if the original condition returns {@code false}. + * + * @param Type of the element to filter + */ +public class RichNotCondition extends RichIterativeCondition { + + private static final long serialVersionUID = 1L; + + private final IterativeCondition original; + + public RichNotCondition(final IterativeCondition original) { + this.original = original; Review comment: I think we should unify the default behavior for null in `RichOr` , `RichAnd` and `RichNot` What do you think? @dianfu @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692738#comment-16692738 ] ASF GitHub Bot commented on FLINK-8159: --- sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234883989 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.DoubleCounter; +import org.apache.flink.api.common.accumulators.Histogram; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.metrics.MetricGroup; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}. + * The runtime context only supports basic operations. Consequently, state access, accumulators, + * broadcast variables and the distributed cache are disabled. + */ +public class CepRuntimeContext implements RuntimeContext { Review comment: From the points of my view, I like change `CepRuntimeContext` to `CepFunctionContext`, Because we only need to provide the functions which will be using in the user-defined function, it will not expose many runtime functions, such as: `getXXXState` low-level operations. But i am fine if you want using `CepRuntimeContext` , What do you think @dawidwys @dianfu This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16691744#comment-16691744 ] ASF GitHub Bot commented on FLINK-8159: --- dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#discussion_r234631211 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern.conditions; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Preconditions; + +/** + * A {@link RichIterativeCondition condition} which combines two conditions with a logical + * {@code AND} and returns {@code true} if both are {@code true}. + * + * @param Type of the element to filter + */ +public class RichAndCondition extends RichIterativeCondition { Review comment: How about we introduce `RichCompositeIterativeCondition` that will get list of conditions in the ctor and will handle the `open/close/setRuntimeContext` methods in one place? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688910#comment-16688910 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu commented on issue #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110#issuecomment-439258573 Hi @dawidwys, could you help to take a look at this PR? Thanks a lot. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688179#comment-16688179 ] ASF GitHub Bot commented on FLINK-8159: --- dianfu opened a new pull request #7110: [FLINK-8159] [cep] Pattern(Flat)SelectFunctions should support RichFunction interface URL: https://github.com/apache/flink/pull/7110 [FLINK-6938] [cep] IterativeCondition should support RichFunction interface ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687524#comment-16687524 ] Dian Fu commented on FLINK-8159: Thanks a lot for the reply and advice. I will submit a PR later today. > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686705#comment-16686705 ] Dawid Wysakowicz commented on FLINK-8159: - Ad. 1 Agree with that approach with adding additional abstract class. But my original question was about using a new {{PatternSelectFunction}}, but I don't think we should support dynamically changing it, that should be possible only for {{IterativeCondition}} Ad. 2 Agreed Ad. 3 I know we could, but I don't think we should expose everything. actually I would limit its capabilities to a minimum, e.g. prohibit state creation, accumulators etc. This is of great importance especially for {{IterativeCondition}} I think as we have somewhat similar view on that issue, I think we could start with the implementation if you are still interested ;). Also last but not least sorry for the late response. > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662091#comment-16662091 ] Dian Fu commented on FLINK-8159: Thanks [~dawidwys] for your feedback. 1. I prefer adding an abstract class RichPattern(Flat)SelectFunction which implements Pattern(Flat)SelectFunction for the following reasons: 1) consist with the way MapFunction/RichMapFunction and all other rich functions take 2) there are no backwards compatibility issues at all 3) If we update Pattern(Flat)SelectFunction to RichPattern(Flat)SelectFunction, I think the backwards compatibility issues can not be solved at all as the old class doesn't exist yet. 2. We can create a class named WrappingRichPattern(Flat)SelectFunction and holding an instance of it in AbstractKeyedCEPPatternOperator. When NFA updated, we create a new RichPattern(Flat)SelectFunction, open it and set it as the currently wrapped function. 3. The RichPattern(Flat)SelectFunction is an child class of AbstractRichFunction and so RuntimeContext is exposed publicly. > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661863#comment-16661863 ] Dawid Wysakowicz commented on FLINK-8159: - Hi [~dian.fu], I would like to see a bit more insights. # Which version do you think is better? Updating the select function or not? Why? # How do you want to pass parameters to open() on condtions for new NFA? # To what extent do you want to provide access to RuntimeContext? Sorry, I cannot fully participate in the design of this feature, but we are currently in the process of preparing the 1.7 release. I might be more responsive after this is done. > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface
[ https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656534#comment-16656534 ] Dian Fu commented on FLINK-8159: Hi [~dawidwys], For the concern #1 you described: my init thoughts was to add an abstract class RichPattern(Flat)SelectFunction and leave Pattern(Flat)SelectFunction as it is (follow the same way as Function/RichFunction). So I think there will be no backwards compatibility issues. For the concern#2: I'm still trying to understand the problem. In my mind, there are two kinds of functions to consider: IterativeCondition and the PatternSelectFunction. When new pattern(which contains IterativeCondition) comes (may be PatternSelectFunction will also be dynamically changed?): For IterativeCondition, we need to recompile the NFA and call the open method of the IterativeConditions which contained in the NFA. For PatternSelectFunction, if it's updated, we just call the open method of the newly come PatternSelectFunction and set it as the currently used function. If not, we don't need do anything. Thoughts? > Pattern(Flat)SelectFunctions should support RichFunction interface > -- > > Key: FLINK-8159 > URL: https://issues.apache.org/jira/browse/FLINK-8159 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > > {{SelectWrapper}} and {{FlatSelectWrapper}} should extends > {{AbstractRichFucntion}} and process properly if the underlying functions > extend RichFunction. > Things to be very careful about: > * backwards compatibility (we previously serialized conditions) - changes to > those interfaces have to be done carefully > * we want to be able to add dynamic patterns in the future, so at some point > we have to open also on control message arrival -- This message was sent by Atlassian JIRA (v7.6.3#76005)