[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16701924#comment-16701924
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys closed pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index 33d24de7748..91a15032a91 100644
--- 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++ 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -113,7 +113,7 @@ class PatternTest {
 
 assertTrue(pattern.getCondition.isDefined)
 assertTrue(previous.getCondition.isDefined)
-assertFalse(preprevious.getCondition.isDefined)
+assertTrue(preprevious.getCondition.isDefined)
 
 assertEquals(pattern.getName, "end")
 assertEquals(previous.getName, "next")
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
new file mode 100644
index 000..ff6e610845c
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
+
+   private final RuntimeContext runtimeContext;
+
+   public CepRuntimeContext(RuntimeContext runtimeContext) {
+   this.runtimeContext = runtimeContext;
+   }
+
+   @Override
+   public String getTaskName() {
+   return runtimeContext.getTaskName();
+   }
+
+   @Override
+   public MetricGroup getMetricGroup() {
+   return runtimeContext.getMetricGroup();
+   }
+
+   @Override
+   public int getNumberOfParallelSubtasks() {
+   return 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16700069#comment-16700069
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on issue #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#issuecomment-441975719
 
 
   @dawidwys Thanks a lot for your latest review comments and have updated the 
PR accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699088#comment-16699088
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys edited a comment on issue #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#issuecomment-441666754
 
 
   Hi @dianfu,
   Just a few comments, before we are ready to go with this PR:
   
   1. Let's deprecate `OrCondition`, `AndCondition`, `NotCondition` with a 
comment that they exist only for backwards compatibility and advice to use the 
rich versions. (Agreed, let's do this in a separate PR/issue: 
https://issues.apache.org/jira/browse/FLINK-10113)
   2. Mark all the conditions in `conditions` package `@Internal`
   3. I would be in favor of unifying the null handling in the new classes in a 
way that we do not allow nulls there. We can change the `getCondition` method 
in `Pattern` to return `BooleanConditions#trueFunction` if nothing was set. 
This will make the whole logic more explicit IMHO. Let's leave the old classes 
as they are, as we want to drop them anyway.
   4. Use the new rich conditions in `Pattern#where` and `Pattern#or`. Add a 
test that rich interfaces work for those methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699073#comment-16699073
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r236285885
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
 ##
 @@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichNotCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link CepRuntimeContext}.
+ */
+public class CepRuntimeContextTest {
+
+   @Test
+   public void testRichPatternSelectFunction() throws Exception {
+   RichPatternSelectFunction function = new 
RichPatternSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public Integer select(Map> 
pattern) throws Exception {
+   return null;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichPatternFlatSelectFunction() throws Exception {
+   RichPatternFlatSelectFunction function =
+   new RichPatternFlatSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public void flatSelect(Map> pattern, Collector out) throws Exception {
+   // no op
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichIterativeConditionFunction() throws Exception {
+   RichIterativeCondition function = new 
RichIterativeCondition() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public boolean filter(
+   Integer value, Context ctx) throws 
Exception {
+   return false;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichCompositeIterativeCondition() throws 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699074#comment-16699074
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r236286156
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
 ##
 @@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichNotCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link CepRuntimeContext}.
+ */
+public class CepRuntimeContextTest {
+
+   @Test
+   public void testRichPatternSelectFunction() throws Exception {
+   RichPatternSelectFunction function = new 
RichPatternSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public Integer select(Map> 
pattern) throws Exception {
+   return null;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichPatternFlatSelectFunction() throws Exception {
+   RichPatternFlatSelectFunction function =
+   new RichPatternFlatSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public void flatSelect(Map> pattern, Collector out) throws Exception {
+   // no op
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichIterativeConditionFunction() throws Exception {
+   RichIterativeCondition function = new 
RichIterativeCondition() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public boolean filter(
+   Integer value, Context ctx) throws 
Exception {
+   return false;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichCompositeIterativeCondition() throws 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699090#comment-16699090
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys edited a comment on issue #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#issuecomment-441666754
 
 
   Hi @dianfu,
   Just a few comments, before we are ready to go with this PR:
   
   1. Let's deprecate `OrCondition`, `AndCondition`, `NotCondition` with a 
comment that they exist only for backwards compatibility and advice to use the 
rich versions. (Agreed, let's do this in a separate PR/issue: 
https://issues.apache.org/jira/browse/FLINK-10113)
   2. Mark all the conditions in `conditions` package `@Internal`
   3. I would be in favor of unifying the null handling in the new classes in a 
way that we do not allow nulls there. We can change the `getCondition` method 
in `Pattern` to return `BooleanConditions#trueFunction` if nothing was set. 
This will make the whole logic more explicit IMHO. Let's leave the old classes 
as they are, as we want to drop them anyway.
   4. Use the new rich conditions in `Pattern#where` and `Pattern#or`. Add a 
test that checks if rich interfaces work for those methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699086#comment-16699086
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys edited a comment on issue #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#issuecomment-441666754
 
 
   Hi @dianfu,
   Just a few comments, before we are ready to go with this PR:
   
   1. Let's deprecate `OrCondition`, `AndCondition`, `NotCondition` with a 
comment that they exist only for backwards compatibility and advice to use the 
rich versions. (Agreed, let's do this in a separate PR/issue: 
https://issues.apache.org/jira/browse/FLINK-10113)
   2. Mark all the conditions in `conditions` package `@Internal`
   3. I would be in favor of unifying the null handling in the new classes in a 
way that we do not allow nulls there. We can change the `getCondition` method 
in `Pattern` to return `BooleanConditions#trueFunction` if nothing was set. 
This will make the whole logic more explicit IMHO. Let's leave the old classes 
as they are, as we want to drop them anyway.
   4. Use the new rich conditions in `Pattern#where` and `Pattern#or`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699085#comment-16699085
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r236287295
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
 ##
 @@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichNotCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link CepRuntimeContext}.
+ */
+public class CepRuntimeContextTest {
+
+   @Test
+   public void testRichPatternSelectFunction() throws Exception {
+   RichPatternSelectFunction function = new 
RichPatternSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public Integer select(Map> 
pattern) throws Exception {
+   return null;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichPatternFlatSelectFunction() throws Exception {
+   RichPatternFlatSelectFunction function =
+   new RichPatternFlatSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public void flatSelect(Map> pattern, Collector out) throws Exception {
+   // no op
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichIterativeConditionFunction() throws Exception {
+   RichIterativeCondition function = new 
RichIterativeCondition() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public boolean filter(
+   Integer value, Context ctx) throws 
Exception {
+   return false;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichCompositeIterativeCondition() throws 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699084#comment-16699084
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r236287295
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
 ##
 @@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichNotCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link CepRuntimeContext}.
+ */
+public class CepRuntimeContextTest {
+
+   @Test
+   public void testRichPatternSelectFunction() throws Exception {
+   RichPatternSelectFunction function = new 
RichPatternSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public Integer select(Map> 
pattern) throws Exception {
+   return null;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichPatternFlatSelectFunction() throws Exception {
+   RichPatternFlatSelectFunction function =
+   new RichPatternFlatSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public void flatSelect(Map> pattern, Collector out) throws Exception {
+   // no op
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichIterativeConditionFunction() throws Exception {
+   RichIterativeCondition function = new 
RichIterativeCondition() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public boolean filter(
+   Integer value, Context ctx) throws 
Exception {
+   return false;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichCompositeIterativeCondition() throws 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699081#comment-16699081
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r236286828
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
 ##
 @@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichNotCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link CepRuntimeContext}.
+ */
+public class CepRuntimeContextTest {
+
+   @Test
+   public void testRichPatternSelectFunction() throws Exception {
+   RichPatternSelectFunction function = new 
RichPatternSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public Integer select(Map> 
pattern) throws Exception {
+   return null;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichPatternFlatSelectFunction() throws Exception {
+   RichPatternFlatSelectFunction function =
+   new RichPatternFlatSelectFunction() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public void flatSelect(Map> pattern, Collector out) throws Exception {
+   // no op
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichIterativeConditionFunction() throws Exception {
+   RichIterativeCondition function = new 
RichIterativeCondition() {
+   private static final long serialVersionUID = 1L;
+
+   @Override
+   public boolean filter(
+   Integer value, Context ctx) throws 
Exception {
+   return false;
+   }
+   };
+
+   verifyRuntimeContext(function);
+   }
+
+   @Test
+   public void testRichCompositeIterativeCondition() throws 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699069#comment-16699069
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on issue #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#issuecomment-441666754
 
 
   Hi @dianfu,
   Just a few comments, before we are ready to go with this PR:
   
   1. Let's deprecate `OrCondition`, `AndCondition`, `NotCondition` with a 
comment that they exist only for backwards compatibility and advice to use the 
rich versions. (Agreed, let's do this in a separate PR/issue: 
https://issues.apache.org/jira/browse/FLINK-10113)
   2. Mark all the conditions in `conditions` package `@Internal`
   3. I would be in favor of unifying the null handling in the new classes in a 
way that we do not allow nulls there. We can change the `getCondition` method 
in `Pattern` to return `BooleanConditions#trueFunction` if nothing was set. 
This will make the whole logic more explicit IMHO. Let's leave the old classes 
as they are, as we want to drop them anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698923#comment-16698923
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r236253748
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichCompositeIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   public RichAndCondition(final IterativeCondition left, final 
IterativeCondition right) {
+   super(left, right);
+   }
+
+   @Override
+   public boolean filter(T value, Context ctx) throws Exception {
+   IterativeCondition left = getLeft();
+   IterativeCondition right = getRight();
+   return (left == null || left.filter(value, ctx)) && (right == 
null || right.filter(value, ctx));
 
 Review comment:
   By `condition() == null` I meant actually evaluating the condition, whereas 
`condition == null` means that a condition was provided at all.
   
   In other words, what I wanted to say here is that the mentioned three state 
logic does not apply here at all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698921#comment-16698921
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r236253748
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichCompositeIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   public RichAndCondition(final IterativeCondition left, final 
IterativeCondition right) {
+   super(left, right);
+   }
+
+   @Override
+   public boolean filter(T value, Context ctx) throws Exception {
+   IterativeCondition left = getLeft();
+   IterativeCondition right = getRight();
+   return (left == null || left.filter(value, ctx)) && (right == 
null || right.filter(value, ctx));
 
 Review comment:
   By `condition() == null` I meant actually evaluating the condition, whereas 
`condition == null` means that a condition was provided at all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16698361#comment-16698361
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on issue #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#issuecomment-441486964
 
 
   @dianfu Thanks for the update! 
   LGTM. +1 to merged
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16695574#comment-16695574
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235608590
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichCompositeIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   public RichAndCondition(final IterativeCondition left, final 
IterativeCondition right) {
+   super(left, right);
+   }
+
+   @Override
+   public boolean filter(T value, Context ctx) throws Exception {
+   IterativeCondition left = getLeft();
+   IterativeCondition right = getRight();
+   return (left == null || left.filter(value, ctx)) && (right == 
null || right.filter(value, ctx));
 
 Review comment:
   @dawidwys Thank you for the reminder!Can you explain more about `condition() 
== null`? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694422#comment-16694422
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235301974
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichCompositeIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   public RichAndCondition(final IterativeCondition left, final 
IterativeCondition right) {
+   super(left, right);
+   }
+
+   @Override
+   public boolean filter(T value, Context ctx) throws Exception {
+   IterativeCondition left = getLeft();
+   IterativeCondition right = getRight();
+   return (left == null || left.filter(value, ctx)) && (right == 
null || right.filter(value, ctx));
 
 Review comment:
   @sunjincheng121 Note that `condition == null` is differente from 
`condition() == null`. It is for cases when condition is not specified at all. 
   
   This is in line with the `MATCH_RECOGNIZE` standard that if a condition for 
a pattern variable is not specified then all rows will be matched against that 
variable. Also just a comment we do not have to always align to SQL standard in 
CEP library, as long as we can maintain the compatibility in the SQL -> CEP 
translation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694297#comment-16694297
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235268470
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichCompositeIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   public RichAndCondition(final IterativeCondition left, final 
IterativeCondition right) {
+   super(left, right);
+   }
+
+   @Override
+   public boolean filter(T value, Context ctx) throws Exception {
+   IterativeCondition left = getLeft();
+   IterativeCondition right = getRight();
+   return (left == null || left.filter(value, ctx)) && (right == 
null || right.filter(value, ctx));
 
 Review comment:
   @dianfu @dawidwys In a traditional database, such as ORACLE
   the CONDITION has three states TRUE, FALSE and UNKNOW. The key points is how 
we handle UNKNOW in Flink.  [Please see 
detail](https://docs.oracle.com/cd/E17904_01/apirefs./e12048/conditions.htm#CQLLR281)
   
   At current time in the `checkFilterCondition()` of `NFA`  aways true if 
`IterativeCondition` is `null`. 
   
![image](https://user-images.githubusercontent.com/22488084/48823207-e951d180-ed9a-11e8-904a-434ff3bb9d60.png)
 
   So we can refer to existing behaviors, and I think you are right @dianfu .
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694243#comment-16694243
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235254991
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code OR} and returns {@code true} if at least one is {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichOrCondition extends RichCompositeIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   public RichOrCondition(final IterativeCondition left, final 
IterativeCondition right) {
+   super(left, right);
+   }
+
+   @Override
+   public boolean filter(T value, Context ctx) throws Exception {
+   IterativeCondition left = getLeft();
+   IterativeCondition right = getRight();
+   return left == null || right == null || left.filter(value, ctx) 
|| right.filter(value, ctx);
+   }
 
 Review comment:
   Same as above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694242#comment-16694242
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235254880
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichCompositeIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   public RichAndCondition(final IterativeCondition left, final 
IterativeCondition right) {
+   super(left, right);
+   }
+
+   @Override
+   public boolean filter(T value, Context ctx) throws Exception {
+   IterativeCondition left = getLeft();
+   IterativeCondition right = getRight();
+   return (left == null || left.filter(value, ctx)) && (right == 
null || right.filter(value, ctx));
 
 Review comment:
   I think the implementation depends on the semantics of null condition. Per 
my understanding, the null condition means that the condition always holds no 
matter what the input element is. @dawidwys what's your thought?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694195#comment-16694195
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235247291
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichCompositeIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   public RichAndCondition(final IterativeCondition left, final 
IterativeCondition right) {
+   super(left, right);
+   }
+
+   @Override
+   public boolean filter(T value, Context ctx) throws Exception {
+   IterativeCondition left = getLeft();
+   IterativeCondition right = getRight();
+   return (left == null || left.filter(value, ctx)) && (right == 
null || right.filter(value, ctx));
 
 Review comment:
   left != null &&  right != null && left.filter(value, ctx) && 
right.filter(value, ctx) ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694194#comment-16694194
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235247496
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code OR} and returns {@code true} if at least one is {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichOrCondition extends RichCompositeIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   public RichOrCondition(final IterativeCondition left, final 
IterativeCondition right) {
+   super(left, right);
+   }
+
+   @Override
+   public boolean filter(T value, Context ctx) throws Exception {
+   IterativeCondition left = getLeft();
+   IterativeCondition right = getRight();
+   return left == null || right == null || left.filter(value, ctx) 
|| right.filter(value, ctx);
+   }
 
 Review comment:
   (left != null && left.filter(value, ctx) ) || (right != null  && 
right.filter(value, ctx)) ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694186#comment-16694186
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235246730
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A {@link RichIterativeCondition condition} which negates the condition it 
wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichNotCondition extends RichIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   private final IterativeCondition original;
+
+   public RichNotCondition(final IterativeCondition original) {
+   this.original = original;
 
 Review comment:
   Sounds good!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694172#comment-16694172
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235244088
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
+
+   private final RuntimeContext runtimeContext;
+
+   public CepRuntimeContext(RuntimeContext runtimeContext) {
+   this.runtimeContext = runtimeContext;
+   }
+
+   @Override
+   public String getTaskName() {
+   return runtimeContext.getTaskName();
+   }
+
+   @Override
+   public MetricGroup getMetricGroup() {
+   return runtimeContext.getMetricGroup();
+   }
+
+   @Override
+   public int getNumberOfParallelSubtasks() {
+   return runtimeContext.getNumberOfParallelSubtasks();
+   }
+
+   @Override
+   public int getMaxNumberOfParallelSubtasks() {
+   return runtimeContext.getMaxNumberOfParallelSubtasks();
+   }
+
+   @Override
+   public int getIndexOfThisSubtask() {
+   return runtimeContext.getIndexOfThisSubtask();
+   }
+
+   @Override
+   public int getAttemptNumber() {
+   return runtimeContext.getAttemptNumber();
+   }
+
+   @Override
+   public String getTaskNameWithSubtasks() {
+   return runtimeContext.getTaskNameWithSubtasks();
+   }
+
+   @Override
+   public ExecutionConfig getExecutionConfig() {
+   return runtimeContext.getExecutionConfig();
+   }
+
+   @Override
+   public ClassLoader getUserCodeClassLoader() {
+   return runtimeContext.getUserCodeClassLoader();
+   }
+
+   // 
---
+   // Unsupported operations
+   // 
---
+
+   @Override
+   public  void addAccumulator(
+   String name, Accumulator accumulator) {
+ 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694167#comment-16694167
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235243736
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link 
RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param  Type of the input elements
+ * @param  Type of the output element
+ */
+public abstract class RichPatternFlatSelectFunction
 
 Review comment:
   Make sense. Have added some test cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694165#comment-16694165
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235243602
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichIterativeCondition {
 
 Review comment:
   I think we need to check the null condition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694162#comment-16694162
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235243458
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A {@link RichIterativeCondition condition} which negates the condition it 
wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichNotCondition extends RichIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   private final IterativeCondition original;
+
+   public RichNotCondition(final IterativeCondition original) {
+   this.original = original;
 
 Review comment:
   +1 to unify the behavior.  Have updated the PR. I mention the test case 
NFAITCase#testNoConditionNFA is just to explain that we need to consider null. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694163#comment-16694163
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r235243458
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A {@link RichIterativeCondition condition} which negates the condition it 
wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichNotCondition extends RichIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   private final IterativeCondition original;
+
+   public RichNotCondition(final IterativeCondition original) {
+   this.original = original;
 
 Review comment:
   +1 to unify the behavior.  Have updated the PR. I mention the test case 
NFAITCase#testNoConditionNFA is just to explain that we need to consider null. 
:)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692974#comment-16692974
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234932717
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
 
 Review comment:
   CepRuntimeContext is also needed by RichIterativeCondition which is in 
another package which preventing us to do so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692965#comment-16692965
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu edited a comment on issue #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#issuecomment-440209346
 
 
   @dawidwys @sunjincheng121 Thanks a lot for your review comments. Have 
updated the PR. 
   
   > I would like to postpone it a little bit though until we figure out if we 
can drop support for deserializing conditions. If so I think we should drop the 
non-rich versions of conditions like And/Or/Not etc.
   
   This means that we will drop the migration support of jobs <= 1.5. Maybe we 
can create another ticket to track that as it also includes other changes such 
as removing NFASerializer etc. Thoughts?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692964#comment-16692964
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on issue #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#issuecomment-440209346
 
 
   @dawidwys @sunjincheng121 Thanks a lot for your review comments. Have 
updated the PR. 
   
   > I would like to postpone it a little bit though until we figure out if we 
can drop support for deserializing conditions. If so I think we should drop the 
non-rich versions of conditions like And/Or/Not etc.
   This means that we will drop the migration support of jobs <= 1.5. Maybe we 
can create another ticket to track that as it also includes other changes such 
as removing NFASerializer etc. Thoughts?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692870#comment-16692870
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234913483
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
+
+   private final RuntimeContext runtimeContext;
+
+   public CepRuntimeContext(RuntimeContext runtimeContext) {
+   this.runtimeContext = runtimeContext;
+   }
+
+   @Override
+   public String getTaskName() {
+   return runtimeContext.getTaskName();
+   }
+
+   @Override
+   public MetricGroup getMetricGroup() {
+   return runtimeContext.getMetricGroup();
+   }
+
+   @Override
+   public int getNumberOfParallelSubtasks() {
+   return runtimeContext.getNumberOfParallelSubtasks();
+   }
+
+   @Override
+   public int getMaxNumberOfParallelSubtasks() {
+   return runtimeContext.getMaxNumberOfParallelSubtasks();
+   }
+
+   @Override
+   public int getIndexOfThisSubtask() {
+   return runtimeContext.getIndexOfThisSubtask();
+   }
+
+   @Override
+   public int getAttemptNumber() {
+   return runtimeContext.getAttemptNumber();
+   }
+
+   @Override
+   public String getTaskNameWithSubtasks() {
+   return runtimeContext.getTaskNameWithSubtasks();
+   }
+
+   @Override
+   public ExecutionConfig getExecutionConfig() {
+   return runtimeContext.getExecutionConfig();
+   }
+
+   @Override
+   public ClassLoader getUserCodeClassLoader() {
+   return runtimeContext.getUserCodeClassLoader();
+   }
+
+   // 
---
+   // Unsupported operations
+   // 
---
+
+   @Override
+   public  void addAccumulator(
+   String name, Accumulator accumulator) 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692860#comment-16692860
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234910601
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A {@link RichIterativeCondition condition} which negates the condition it 
wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichNotCondition extends RichIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   private final IterativeCondition original;
+
+   public RichNotCondition(final IterativeCondition original) {
+   this.original = original;
 
 Review comment:
   Yes, I see that case, but why can't we let `RichAnd` and `RichOr` have 
default handling for null?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692854#comment-16692854
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234909395
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichIterativeCondition {
 
 Review comment:
   Variable parameters are a good idea! Does `RichNot ` need check null of 
Condition ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692833#comment-16692833
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234906587
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
+
+   private final RuntimeContext runtimeContext;
+
+   public CepRuntimeContext(RuntimeContext runtimeContext) {
+   this.runtimeContext = runtimeContext;
+   }
+
+   @Override
+   public String getTaskName() {
+   return runtimeContext.getTaskName();
+   }
+
+   @Override
+   public MetricGroup getMetricGroup() {
+   return runtimeContext.getMetricGroup();
+   }
+
+   @Override
+   public int getNumberOfParallelSubtasks() {
+   return runtimeContext.getNumberOfParallelSubtasks();
+   }
+
+   @Override
+   public int getMaxNumberOfParallelSubtasks() {
+   return runtimeContext.getMaxNumberOfParallelSubtasks();
+   }
+
+   @Override
+   public int getIndexOfThisSubtask() {
+   return runtimeContext.getIndexOfThisSubtask();
+   }
+
+   @Override
+   public int getAttemptNumber() {
+   return runtimeContext.getAttemptNumber();
+   }
+
+   @Override
+   public String getTaskNameWithSubtasks() {
+   return runtimeContext.getTaskNameWithSubtasks();
+   }
+
+   @Override
+   public ExecutionConfig getExecutionConfig() {
+   return runtimeContext.getExecutionConfig();
+   }
+
+   @Override
+   public ClassLoader getUserCodeClassLoader() {
+   return runtimeContext.getUserCodeClassLoader();
+   }
+
+   // 
---
+   // Unsupported operations
+   // 
---
+
+   @Override
+   public  void addAccumulator(
+   String name, Accumulator accumulator) {
+   

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692841#comment-16692841
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234907461
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A {@link RichIterativeCondition condition} which negates the condition it 
wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichNotCondition extends RichIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   private final IterativeCondition original;
+
+   public RichNotCondition(final IterativeCondition original) {
+   this.original = original;
 
 Review comment:
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692842#comment-16692842
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234907686
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
 
 Review comment:
   Could we make this class package protected?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692840#comment-16692840
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234907432
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link 
RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param  Type of the input elements
+ * @param  Type of the output element
+ */
+public abstract class RichPatternFlatSelectFunction
 
 Review comment:
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692839#comment-16692839
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234907384
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
 
 Review comment:
   This is an internal name in CEP library and it follows the naming convention 
of other such classes e.g. `RichAsyncFunctionRuntimeContext`, 
`IterationRuntimeUDFContext` etc. So I would be for using the `RuntimeContext` 
suffix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692838#comment-16692838
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234907168
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A {@link RichIterativeCondition condition} which negates the condition it 
wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichNotCondition extends RichIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   private final IterativeCondition original;
+
+   public RichNotCondition(final IterativeCondition original) {
+   this.original = original;
 
 Review comment:
   I think this is by design as CEP support patterns where there are no 
conditions. You can refer to the test case of NFAITCase#testNoConditionNFA. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692824#comment-16692824
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234904578
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichIterativeCondition {
 
 Review comment:
   I though about sth like this:
   
   abstract class RichCompositeCondition extends 
RichIterativeCondition {
   
private IterativeCondition[] conditions;
   
RichCompositeCondition(IterativeCondition... conditions) {
this.conditions = conditions;
}
   
@Override
public void setRuntimeContext(RuntimeContext runtimeContext) {
super.setRuntimeContext(runtimeContext);
   
for (IterativeCondition condition : conditions) {
FunctionUtils.setFunctionRuntimeContext(condition, 
runtimeContext);
}
}
   
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
   
for (IterativeCondition condition : conditions) {
openFunction(condition, parameters);
}
}
   
@Override
public void close() throws Exception {
super.close();
   
for (IterativeCondition condition : conditions) {
closeFunction(condition);
}
}
   }
   
   public class RichAndCondition extends RichCompositeCondition {
   
private static final long serialVersionUID = 1L;
   
private final IterativeCondition left;
private final IterativeCondition right;
   
public RichAndCondition(final IterativeCondition left, final 
IterativeCondition right) {
super(left, right);
this.left = Preconditions.checkNotNull(left, "The condition 
cannot be null.");
this.right = Preconditions.checkNotNull(right, "The condition 
cannot be null.");
   
}
   
@Override
public boolean filter(T value, Context ctx) throws Exception {
return left.filter(value, ctx) && right.filter(value, ctx);
}
   
/**
 * @return One of the {@link IterativeCondition conditions} combined in 
this condition.
 */
public IterativeCondition getLeft() {
return left;
}
   
/**
 * @return One of the {@link IterativeCondition conditions} combined in 
this condition.
 */
public IterativeCondition getRight() {
return right;
}
   }
   
   public class RichNotCondition extends RichCompositeCondition {
   
private static final long serialVersionUID = 1L;
   
private final IterativeCondition original;
   
public RichNotCondition(final IterativeCondition original) {
super(original);
this.original = Preconditions.checkNotNull(original);
}
   
@Override
public boolean filter(T value, Context ctx) throws Exception {
return original != null && 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692768#comment-16692768
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on issue #5080: [FLINK-8159] [cep] Add rich support for 
SelectWrapper and FlatSelectWrapper
URL: https://github.com/apache/flink/pull/5080#issuecomment-440170762
 
 
   Close this PR as it is duplicate with #7110


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692764#comment-16692764
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu closed pull request #5080: [FLINK-8159] [cep] Add rich support for 
SelectWrapper and FlatSelectWrapper
URL: https://github.com/apache/flink/pull/5080
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
index 4423bb1dd40..95225271b9e 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
@@ -20,7 +20,8 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
@@ -28,6 +29,7 @@
 import org.apache.flink.cep.PatternFlatTimeoutFunction;
 import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.util.OutputTag;
 
@@ -102,7 +104,7 @@ protected void processTimedOutSequences(
 * in one udf.
 */
@Internal
-   public static class FlatSelectWrapper implements 
Function {
+   public static class FlatSelectWrapper extends 
AbstractRichFunction {
 
private static final long serialVersionUID = 
-8320546120157150202L;
 
@@ -125,5 +127,25 @@ public FlatSelectWrapper(
this.flatSelectFunction = flatSelectFunction;
this.flatTimeoutFunction = flatTimeoutFunction;
}
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   if (flatSelectFunction instanceof RichFunction) {
+   ((RichFunction) 
flatSelectFunction).open(parameters);
+   }
+   if (flatTimeoutFunction instanceof RichFunction) {
+   ((RichFunction) 
flatTimeoutFunction).open(parameters);
+   }
+   }
+
+   @Override
+   public void close() throws Exception {
+   if (flatSelectFunction instanceof RichFunction) {
+   ((RichFunction) flatSelectFunction).close();
+   }
+   if (flatTimeoutFunction instanceof RichFunction) {
+   ((RichFunction) flatTimeoutFunction).close();
+   }
+   }
}
 }
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
index cb233a486ec..18a1454cf4c 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
@@ -19,7 +19,8 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
@@ -27,6 +28,7 @@
 import org.apache.flink.cep.PatternTimeoutFunction;
 import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;
 
@@ -54,8 +56,8 @@ public SelectTimeoutCepOperator(
NFACompiler.NFAFactory nfaFactory,
final EventComparator comparator,
AfterMatchSkipStrategy skipStrategy,
-   

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692740#comment-16692740
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234878521
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
+
+   private final RuntimeContext runtimeContext;
+
+   public CepRuntimeContext(RuntimeContext runtimeContext) {
+   this.runtimeContext = runtimeContext;
+   }
+
+   @Override
+   public String getTaskName() {
+   return runtimeContext.getTaskName();
+   }
+
+   @Override
+   public MetricGroup getMetricGroup() {
+   return runtimeContext.getMetricGroup();
+   }
+
+   @Override
+   public int getNumberOfParallelSubtasks() {
+   return runtimeContext.getNumberOfParallelSubtasks();
+   }
+
+   @Override
+   public int getMaxNumberOfParallelSubtasks() {
+   return runtimeContext.getMaxNumberOfParallelSubtasks();
+   }
+
+   @Override
+   public int getIndexOfThisSubtask() {
+   return runtimeContext.getIndexOfThisSubtask();
+   }
+
+   @Override
+   public int getAttemptNumber() {
+   return runtimeContext.getAttemptNumber();
+   }
+
+   @Override
+   public String getTaskNameWithSubtasks() {
+   return runtimeContext.getTaskNameWithSubtasks();
+   }
+
+   @Override
+   public ExecutionConfig getExecutionConfig() {
+   return runtimeContext.getExecutionConfig();
+   }
+
+   @Override
+   public ClassLoader getUserCodeClassLoader() {
+   return runtimeContext.getUserCodeClassLoader();
+   }
+
+   // 
---
+   // Unsupported operations
+   // 
---
+
+   @Override
+   public  void addAccumulator(
+   String name, Accumulator accumulator) 

[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692742#comment-16692742
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234882983
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichIterativeCondition {
 
 Review comment:
   I agree to deal with the open and close methods in a way that adds abstract 
classes, and RichAnd is very similar to RichOr, but how do we deal with the 
difference between RichNot and RichAnd/RichOr?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692741#comment-16692741
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234885267
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link 
RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param  Type of the input elements
+ * @param  Type of the output element
+ */
+public abstract class RichPatternFlatSelectFunction
 
 Review comment:
   Add the test case in `CEPITCase`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692739#comment-16692739
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234885688
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * A {@link RichIterativeCondition condition} which negates the condition it 
wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichNotCondition extends RichIterativeCondition {
+
+   private static final long serialVersionUID = 1L;
+
+   private final IterativeCondition original;
+
+   public RichNotCondition(final IterativeCondition original) {
+   this.original = original;
 
 Review comment:
   I think we should unify the default behavior for null in `RichOr` , 
`RichAnd` and `RichNot` What do you think? @dianfu @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692738#comment-16692738
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

sunjincheng121 commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234883989
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
 ##
 @@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition 
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state 
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
 
 Review comment:
   From the points of my view, I like change `CepRuntimeContext` to 
`CepFunctionContext`, Because we only need to provide the functions which will 
be using in the user-defined function, it will not expose many runtime 
functions, such as: `getXXXState`  low-level operations. But i am fine if you 
want using `CepRuntimeContext` , What do you think  @dawidwys @dianfu 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16691744#comment-16691744
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dawidwys commented on a change in pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#discussion_r234631211
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
 ##
 @@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions 
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param  Type of the element to filter
+ */
+public class RichAndCondition extends RichIterativeCondition {
 
 Review comment:
   How about we introduce `RichCompositeIterativeCondition` that will get list 
of conditions in the ctor and will handle the `open/close/setRuntimeContext` 
methods in one place?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688910#comment-16688910
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu commented on issue #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110#issuecomment-439258573
 
 
   Hi @dawidwys, could you help to take a look at this PR? Thanks a lot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688179#comment-16688179
 ] 

ASF GitHub Bot commented on FLINK-8159:
---

dianfu opened a new pull request #7110: [FLINK-8159] [cep] 
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110
 
 
   [FLINK-6938] [cep] IterativeCondition should support RichFunction interface
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-14 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16687524#comment-16687524
 ] 

Dian Fu commented on FLINK-8159:


Thanks a lot for the reply and advice. I will submit a PR later today.

> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-11-14 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686705#comment-16686705
 ] 

Dawid Wysakowicz commented on FLINK-8159:
-

Ad. 1 Agree with that approach with adding additional abstract class. But my 
original question was about using a new {{PatternSelectFunction}}, but I don't 
think we should support dynamically changing it, that should be possible only 
for {{IterativeCondition}}
Ad. 2 Agreed
Ad. 3 I know we could, but I don't think we should expose everything. actually 
I would limit its capabilities to a minimum, e.g. prohibit state creation, 
accumulators etc. This is of great importance especially for 
{{IterativeCondition}}

I think as we have somewhat similar view on that issue, I think we could start 
with the implementation if you are still interested ;). Also last but not least 
sorry for the late response.

> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-10-24 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662091#comment-16662091
 ] 

Dian Fu commented on FLINK-8159:


Thanks [~dawidwys] for your feedback.

1. I prefer adding an abstract class RichPattern(Flat)SelectFunction which 
implements Pattern(Flat)SelectFunction for the following reasons: 
  1) consist with the way MapFunction/RichMapFunction and all other rich 
functions take
  2) there are no backwards compatibility issues at all
  3) If we update Pattern(Flat)SelectFunction to 
RichPattern(Flat)SelectFunction, I think the backwards compatibility issues can 
not be solved at all as the old class doesn't exist yet.
2. We can create a class named WrappingRichPattern(Flat)SelectFunction and 
holding an instance of it in AbstractKeyedCEPPatternOperator. When NFA updated, 
we create a new RichPattern(Flat)SelectFunction, open it and set it as the 
currently wrapped function.
3. The RichPattern(Flat)SelectFunction is an child class of 
AbstractRichFunction and so RuntimeContext is exposed publicly.

> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-10-24 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661863#comment-16661863
 ] 

Dawid Wysakowicz commented on FLINK-8159:
-

Hi [~dian.fu],
I would like to see a bit more insights.
# Which version do you think is better? Updating the select function or not? 
Why?
# How do you want to pass parameters to open() on condtions for new NFA?
# To what extent do you want to provide access to RuntimeContext?

Sorry, I cannot fully participate in the design of this feature, but we are 
currently in the process of preparing the 1.7 release. I might be more 
responsive after this is done.

> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8159) Pattern(Flat)SelectFunctions should support RichFunction interface

2018-10-19 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656534#comment-16656534
 ] 

Dian Fu commented on FLINK-8159:


Hi [~dawidwys], 
For the concern #1 you described:
my init thoughts was to add an abstract class RichPattern(Flat)SelectFunction 
and leave Pattern(Flat)SelectFunction as it is (follow the same way as 
Function/RichFunction). So I think there will be no backwards compatibility 
issues.


For the concern#2:
I'm still trying to understand the problem. In my mind, there are two kinds of 
functions to consider: IterativeCondition and the PatternSelectFunction.
When new pattern(which contains IterativeCondition) comes (may be 
PatternSelectFunction will also be dynamically changed?):
For IterativeCondition, we need to recompile the NFA and call the open method 
of the IterativeConditions which contained in the NFA. 
For PatternSelectFunction, if it's updated, we just call the open method of the 
newly come PatternSelectFunction and set it as the currently used function. If 
not, we don't need do anything.

Thoughts?

> Pattern(Flat)SelectFunctions should support RichFunction interface
> --
>
> Key: FLINK-8159
> URL: https://issues.apache.org/jira/browse/FLINK-8159
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)