[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074518#comment-16074518
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4153


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073076#comment-16073076
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4153
  
@dawidwys Thanks a lot for the review. Updated the doc.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072489#comment-16072489
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4153
  
@dianfu Thanks for the update. The code looks really nice right now. The 
only thing that is missing for this PR, are the docs. Could you please add a 
section about the group patterns?


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071577#comment-16071577
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4153
  
@dawidwys thanks a lot for your comments. Have updated the PR and it should 
have addressed all the comments. :)


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071576#comment-16071576
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125178823
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
--- End diff --

updated.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071546#comment-16071546
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176853
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.>newArrayList(
+   Lists.newArrayList(c, a1, b1, a2, b2, d)
+   

[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071545#comment-16071545
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176801
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.>newArrayList(
+   Lists.newArrayList(c, a1, b1, a2, b2, d)
+   

[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071540#comment-16071540
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176457
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.>newArrayList(
+   Lists.newArrayList(c, a1, b1, a2, b2, d)
+   

[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071538#comment-16071538
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176384
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
--- End diff --

done


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071537#comment-16071537
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125176383
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.>newArrayList(
+   Lists.newArrayList(c, a1, b1, a2, b2, d)
+   

[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071525#comment-16071525
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173912
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -430,6 +431,54 @@ public Quantifier getQuantifier() {
return this;
}
 
+   /**
+* Starts a new pattern sequence. The provided pattern is the initial 
pattern
+* of the new sequence.
+*
+* @param group the pattern to begin with
+* @return the first pattern of a pattern sequence
+*/
+   public static  GroupPattern begin(Pattern 
group) {
+   return new GroupPattern<>(null, group);
+   }
+
+   /**
+* Appends a new pattern to the existing one. The new pattern enforces 
non-strict
--- End diff --

done


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071523#comment-16071523
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173876
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -153,9 +153,8 @@ public int hashCode() {
private final int to;
 
private Times(int from, int to) {
-   Preconditions.checkArgument(from >= 0, "The from should 
be a non-negative number greater than or equal to 0.");
+   Preconditions.checkArgument(from > 0, "The from should 
be a positive number greater than 0.");
--- End diff --

done.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071524#comment-16071524
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173877
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
 ---
@@ -92,6 +92,57 @@ public boolean filter(Event value) throws Exception {
}
 
@Test
+   public void testTimesRangeFromZero() {
--- End diff --

done


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071522#comment-16071522
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173871
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
 ---
@@ -55,6 +55,11 @@ public void checkNameUniqueness(String name) {
if (usedNames.contains(name)) {
throw new MalformedPatternException("Duplicate pattern 
name: " + name + ". Names must be unique.");
}
+   usedNames.add(name);
+   }
+
--- End diff --

updated


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071520#comment-16071520
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173828
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -455,6 +548,76 @@ private void addStopStateToLooping(final State 
loopingState) {
}
 
/**
+* Create all the states for the group pattern.
+*
+* @param groupPattern the group pattern to create the states 
for
+* @param sinkState the state that the group pattern being 
converted should point to
+* @param proceedState the state that the group pattern being 
converted should proceed to
+* @param isOptional whether the group pattern being converted 
is optional
+* @return the first state of the states of the group pattern
+*/
+   private State createGroupPatternState(
+   final GroupPattern groupPattern,
+   final State sinkState,
+   final State proceedState,
+   final boolean isOptional) {
+   final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();
+
+   Pattern oldCurrentPattern = currentPattern;
+   Pattern oldFollowingPattern = followingPattern;
+   GroupPattern oldGroupPattern = 
currentGroupPattern;
+   try {
--- End diff --

updated.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071521#comment-16071521
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173832
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -455,6 +548,76 @@ private void addStopStateToLooping(final State 
loopingState) {
}
 
/**
+* Create all the states for the group pattern.
+*
+* @param groupPattern the group pattern to create the states 
for
+* @param sinkState the state that the group pattern being 
converted should point to
+* @param proceedState the state that the group pattern being 
converted should proceed to
+* @param isOptional whether the group pattern being converted 
is optional
+* @return the first state of the states of the group pattern
+*/
+   private State createGroupPatternState(
+   final GroupPattern groupPattern,
+   final State sinkState,
+   final State proceedState,
+   final boolean isOptional) {
+   final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();
+
+   Pattern oldCurrentPattern = currentPattern;
+   Pattern oldFollowingPattern = followingPattern;
+   GroupPattern oldGroupPattern = 
currentGroupPattern;
+   try {
+   State lastSink = sinkState;
+   currentGroupPattern = groupPattern;
+   currentPattern = groupPattern.getRawPattern();
+   lastSink = createMiddleStates(lastSink);
+   lastSink = convertPattern(lastSink);
+   if (isOptional) {
+   // for the first state of a group 
pattern, its PROCEED edge should point to
+   // the following state of that group 
pattern
+   lastSink.addProceed(proceedState, 
trueFunction);
+   }
+   return lastSink;
+   } finally {
+   currentPattern = oldCurrentPattern;
+   followingPattern = oldFollowingPattern;
+   currentGroupPattern = oldGroupPattern;
+   }
+   }
+
+   /**
+* Create the states for the group pattern as a looping one.
+*
+* @param groupPattern the group pattern to create the states 
for
+* @param sinkState the state that the group pattern being 
converted should point to
+* @return the first state of the states of the group pattern
+*/
+   private State createLoopingGroupPatternState(
+   final GroupPattern groupPattern,
+   final State sinkState) {
+   final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();
+
+   Pattern oldCurrentPattern = currentPattern;
+   Pattern oldFollowingPattern = followingPattern;
+   GroupPattern oldGroupPattern = 
currentGroupPattern;
+   try {
--- End diff --

updated.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16071519#comment-16071519
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125173760
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -366,8 +366,9 @@ public Quantifier getQuantifier() {
checkIfNoNotPattern();
checkIfQuantifierApplied();
this.quantifier = 
Quantifier.times(quantifier.getConsumingStrategy());
-   if (from == 0) {
--- End diff --

Thanks for the suggestion, created PR: 
https://github.com/apache/flink/pull/4242


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069900#comment-16069900
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125005796
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -366,8 +366,9 @@ public Quantifier getQuantifier() {
checkIfNoNotPattern();
checkIfQuantifierApplied();
this.quantifier = 
Quantifier.times(quantifier.getConsumingStrategy());
-   if (from == 0) {
--- End diff --

Could you submit the change to TimesRange with another JIRA/PR? Let's make 
one change at a time.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069902#comment-16069902
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r124994181
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
 ---
@@ -55,6 +55,11 @@ public void checkNameUniqueness(String name) {
if (usedNames.contains(name)) {
throw new MalformedPatternException("Duplicate pattern 
name: " + name + ". Names must be unique.");
}
+   usedNames.add(name);
+   }
+
--- End diff --

javadoc missing


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069905#comment-16069905
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125009258
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.>newArrayList(
+   Lists.newArrayList(c, a1, b1, a2, b2, d)
+ 

[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069898#comment-16069898
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125005277
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -430,6 +431,54 @@ public Quantifier getQuantifier() {
return this;
}
 
+   /**
+* Starts a new pattern sequence. The provided pattern is the initial 
pattern
+* of the new sequence.
+*
+* @param group the pattern to begin with
+* @return the first pattern of a pattern sequence
+*/
+   public static  GroupPattern begin(Pattern 
group) {
+   return new GroupPattern<>(null, group);
+   }
+
+   /**
+* Appends a new pattern to the existing one. The new pattern enforces 
non-strict
--- End diff --

I would update the docs, to put more stress that this method is used for 
appending a group pattern.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069897#comment-16069897
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r124999092
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -455,6 +548,76 @@ private void addStopStateToLooping(final State 
loopingState) {
}
 
/**
+* Create all the states for the group pattern.
+*
+* @param groupPattern the group pattern to create the states 
for
+* @param sinkState the state that the group pattern being 
converted should point to
+* @param proceedState the state that the group pattern being 
converted should proceed to
+* @param isOptional whether the group pattern being converted 
is optional
+* @return the first state of the states of the group pattern
+*/
+   private State createGroupPatternState(
+   final GroupPattern groupPattern,
+   final State sinkState,
+   final State proceedState,
+   final boolean isOptional) {
+   final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();
+
+   Pattern oldCurrentPattern = currentPattern;
+   Pattern oldFollowingPattern = followingPattern;
+   GroupPattern oldGroupPattern = 
currentGroupPattern;
+   try {
--- End diff --

What is the point of this `try` block? Why not:

private State createGroupPatternState(
final GroupPattern groupPattern,
final State sinkState,
final State proceedState,
final boolean isOptional) {
final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();

Pattern oldCurrentPattern = currentPattern;
Pattern oldFollowingPattern = followingPattern;
GroupPattern oldGroupPattern = currentGroupPattern;
State lastSink = sinkState;
currentGroupPattern = groupPattern;
currentPattern = groupPattern.getRawPattern();
lastSink = createMiddleStates(lastSink);
lastSink = convertPattern(lastSink);
if (isOptional) {
// for the first state of a group pattern, its PROCEED 
edge should point to
// the following state of that group pattern
lastSink.addProceed(proceedState, trueFunction);
}
currentPattern = oldCurrentPattern;
followingPattern = oldFollowingPattern;
currentGroupPattern = oldGroupPattern;
return lastSink;
}


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069908#comment-16069908
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125010592
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.>newArrayList(
+   Lists.newArrayList(c, a1, b1, a2, b2, d)
+ 

[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069901#comment-16069901
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125012573
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
--- End diff --

I missed a test or two for `... followedByAny (...).oneOrMore `
also a test or two with `notNext` and `notFollow` after group patterns 
would be helpful I think.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069907#comment-16069907
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125009695
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.>newArrayList(
+   Lists.newArrayList(c, a1, b1, a2, b2, d)
+ 

[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069904#comment-16069904
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125009025
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
--- End diff --

`testGroupFollowedBy` -> `testGroupFollowedByTimes`


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069909#comment-16069909
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125012236
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
 ---
@@ -0,0 +1,807 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.GroupPattern;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link GroupPattern}.
+ */
+@SuppressWarnings("unchecked")
+public class GroupITCase extends TestLogger {
+
+   @Test
+   public void testGroupFollowedBy() {
+   List> inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event b1 = new Event(42, "b", 3.0);
+   Event a2 = new Event(43, "a", 4.0);
+   Event b2 = new Event(44, "b", 5.0);
+   Event d = new Event(45, "d", 6.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(b1, 3));
+   inputEvents.add(new StreamRecord<>(a2, 4));
+   inputEvents.add(new StreamRecord<>(b2, 5));
+   inputEvents.add(new StreamRecord<>(d, 6));
+
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy(Pattern.begin("middle1").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).followedBy("middle2").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("b");
+   }
+   })).times(2).followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List> resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.>newArrayList(
+   Lists.newArrayList(c, a1, b1, a2, b2, d)
+ 

[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069906#comment-16069906
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125005984
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -455,6 +548,76 @@ private void addStopStateToLooping(final State 
loopingState) {
}
 
/**
+* Create all the states for the group pattern.
+*
+* @param groupPattern the group pattern to create the states 
for
+* @param sinkState the state that the group pattern being 
converted should point to
+* @param proceedState the state that the group pattern being 
converted should proceed to
+* @param isOptional whether the group pattern being converted 
is optional
+* @return the first state of the states of the group pattern
+*/
+   private State createGroupPatternState(
+   final GroupPattern groupPattern,
+   final State sinkState,
+   final State proceedState,
+   final boolean isOptional) {
+   final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();
+
+   Pattern oldCurrentPattern = currentPattern;
+   Pattern oldFollowingPattern = followingPattern;
+   GroupPattern oldGroupPattern = 
currentGroupPattern;
+   try {
+   State lastSink = sinkState;
+   currentGroupPattern = groupPattern;
+   currentPattern = groupPattern.getRawPattern();
+   lastSink = createMiddleStates(lastSink);
+   lastSink = convertPattern(lastSink);
+   if (isOptional) {
+   // for the first state of a group 
pattern, its PROCEED edge should point to
+   // the following state of that group 
pattern
+   lastSink.addProceed(proceedState, 
trueFunction);
+   }
+   return lastSink;
+   } finally {
+   currentPattern = oldCurrentPattern;
+   followingPattern = oldFollowingPattern;
+   currentGroupPattern = oldGroupPattern;
+   }
+   }
+
+   /**
+* Create the states for the group pattern as a looping one.
+*
+* @param groupPattern the group pattern to create the states 
for
+* @param sinkState the state that the group pattern being 
converted should point to
+* @return the first state of the states of the group pattern
+*/
+   private State createLoopingGroupPatternState(
+   final GroupPattern groupPattern,
+   final State sinkState) {
+   final IterativeCondition trueFunction = 
BooleanConditions.trueFunction();
+
+   Pattern oldCurrentPattern = currentPattern;
+   Pattern oldFollowingPattern = followingPattern;
+   GroupPattern oldGroupPattern = 
currentGroupPattern;
+   try {
--- End diff --

Same as above. Whyt `try`?


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069899#comment-16069899
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125005837
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -153,9 +153,8 @@ public int hashCode() {
private final int to;
 
private Times(int from, int to) {
-   Preconditions.checkArgument(from >= 0, "The from should 
be a non-negative number greater than or equal to 0.");
+   Preconditions.checkArgument(from > 0, "The from should 
be a positive number greater than 0.");
--- End diff --

Move to another JIRA/PR


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069903#comment-16069903
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4153#discussion_r125005892
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
 ---
@@ -92,6 +92,57 @@ public boolean filter(Event value) throws Exception {
}
 
@Test
+   public void testTimesRangeFromZero() {
--- End diff --

Move to another JIRA/PR


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068531#comment-16068531
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4153
  
Hi @dianfu and @dawidwys . 

I think that we should stick to the current API as:
1) a new API will raise serious backwards compatibility concerns, 
2) people have already started using the current API and 
3) there are two parallel efforts going on (SQL/ CEP standalone) and we 
should not block one on the other.

If we agree on this, then this PR is ready for review @dawidwys .


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068013#comment-16068013
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4153
  
@dianfu sorry I have not reviewed it yet, but I do think this feature would 
benefit from reworking of the Pattern API I propose in 
[FLINK-3414](https://issues.apache.org/jira/browse/FLINK-3414).

Instead of checking for head/tail of group pattern and caching them, we 
could more or less use current code for plain sequence creation that could 
return begin/end states. Then the code for joining groups would be much easier, 
cause it would operate on already translated sequences.

Also the new API would make 
[FLINK-4641]](https://issues.apache.org/jira/browse/FLINK-4641)
much easier I think.

As it would require API rework I would really like to hear @kl0u opinion. 
If we agree though not to change the API I will go straight to reviewing this 
PR.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065826#comment-16065826
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4153
  
rebase the code and @dawidwys @kl0u could you help to take a look at this 
PR?


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16058676#comment-16058676
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

Github user dianfu commented on the issue:

https://github.com/apache/flink/pull/4153
  
@dawidwys @kl0u It will be great if you could take a look at this PR.


> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6927) Support pattern group in CEP

2017-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16057193#comment-16057193
 ] 

ASF GitHub Bot commented on FLINK-6927:
---

GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4153

[FLINK-6927] [cep] Support pattern group in CEP

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink FLINK-6927

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4153


commit 6b295b82ddcda542ef0832bdf8405c4bad975882
Author: Dian Fu 
Date:   2017-06-21T08:41:21Z

[FLINK-6927] [cep] Support pattern group in CEP




> Support pattern group in CEP
> 
>
> Key: FLINK-6927
> URL: https://issues.apache.org/jira/browse/FLINK-6927
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> We should add support for pattern group. This would enrich the set of 
> supported patterns. For example, users can write patterns like this with this 
> feature available:
> {code}
>  A --> (B --> C.times(3)).optional() --> D
> {code}
> or
> {code}
> A --> (B --> C).times(3) --> D
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)