[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r135441351
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -421,6 +437,15 @@ private void addStopStateToLooping(final State 
loopingState) {
untilCondition,
true);
 
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)
 &&
+   times.getFrom() != times.getTo()) {
+   if (untilCondition != null) {
+   State sinkStateCopy = 
copy(sinkState);
+   
originalStateMap.put(sinkState.getName(), sinkStateCopy);
--- End diff --

originalStateMap is used when compiling the NFA and it will be collected 
after NFA is created and so I think it's unnecessary to clear the entries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r135440842
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern 
pattern) {
return createGroupPatternState((GroupPattern) 
currentPattern, sinkState, proceedState, isOptional);
}
 
-   final IterativeCondition trueFunction = 
getTrueFunction();
-
final State singletonState = 
createState(currentPattern.getName(), State.StateType.Normal);
// if event is accepted then all notPatterns previous 
to the optional states are no longer valid
final State sink = 
copyWithoutTransitiveNots(sinkState);
singletonState.addTake(sink, takeCondition);
 
+   // if no element accepted the previous nots are still 
valid.
+   final IterativeCondition proceedCondition = 
getTrueFunction();
+
// for the first state of a group pattern, its PROCEED 
edge should point to the following state of
// that group pattern and the edge will be added at the 
end of creating the NFA for that group pattern
if (isOptional && !headOfGroup(currentPattern)) {
-   // if no element accepted the previous nots are 
still valid.
-   singletonState.addProceed(proceedState, 
trueFunction);
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   final IterativeCondition 
untilCondition =
+   (IterativeCondition) 
currentPattern.getUntilCondition();
+   if (untilCondition != null) {
+   singletonState.addProceed(
+   
originalStateMap.get(proceedState.getName()),
+   new 
AndCondition<>(proceedCondition, untilCondition));
--- End diff --

When untilCondition holds, the loop should break and the state should 
proceed to the next state. This is covered by the test case 
GreedyITCase#testGreedyUntilWithDummyEventsBeforeQuantifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-25 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r135367240
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern 
pattern) {
return createGroupPatternState((GroupPattern) 
currentPattern, sinkState, proceedState, isOptional);
}
 
-   final IterativeCondition trueFunction = 
getTrueFunction();
-
final State singletonState = 
createState(currentPattern.getName(), State.StateType.Normal);
// if event is accepted then all notPatterns previous 
to the optional states are no longer valid
final State sink = 
copyWithoutTransitiveNots(sinkState);
singletonState.addTake(sink, takeCondition);
 
+   // if no element accepted the previous nots are still 
valid.
+   final IterativeCondition proceedCondition = 
getTrueFunction();
+
// for the first state of a group pattern, its PROCEED 
edge should point to the following state of
// that group pattern and the edge will be added at the 
end of creating the NFA for that group pattern
if (isOptional && !headOfGroup(currentPattern)) {
-   // if no element accepted the previous nots are 
still valid.
-   singletonState.addProceed(proceedState, 
trueFunction);
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   final IterativeCondition 
untilCondition =
+   (IterativeCondition) 
currentPattern.getUntilCondition();
+   if (untilCondition != null) {
+   singletonState.addProceed(
+   
originalStateMap.get(proceedState.getName()),
+   new 
AndCondition<>(proceedCondition, untilCondition));
--- End diff --

Why is this not wrapped with NotCondition ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-25 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r135366609
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -526,18 +551,32 @@ private boolean isPatternOptional(Pattern 
pattern) {
return createGroupPatternState((GroupPattern) 
currentPattern, sinkState, proceedState, isOptional);
}
 
-   final IterativeCondition trueFunction = 
getTrueFunction();
-
final State singletonState = 
createState(currentPattern.getName(), State.StateType.Normal);
// if event is accepted then all notPatterns previous 
to the optional states are no longer valid
final State sink = 
copyWithoutTransitiveNots(sinkState);
singletonState.addTake(sink, takeCondition);
 
+   // if no element accepted the previous nots are still 
valid.
+   final IterativeCondition proceedCondition = 
getTrueFunction();
+
// for the first state of a group pattern, its PROCEED 
edge should point to the following state of
// that group pattern and the edge will be added at the 
end of creating the NFA for that group pattern
if (isOptional && !headOfGroup(currentPattern)) {
-   // if no element accepted the previous nots are 
still valid.
-   singletonState.addProceed(proceedState, 
trueFunction);
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   final IterativeCondition 
untilCondition =
+   (IterativeCondition) 
currentPattern.getUntilCondition();
+   if (untilCondition != null) {
+   singletonState.addProceed(
+   
originalStateMap.get(proceedState.getName()),
+   new 
AndCondition<>(proceedCondition, untilCondition));
+   }
+   singletonState.addProceed(proceedState,
+   untilCondition != null
--- End diff --

Redundant check - see line 568


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-25 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r135366122
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -421,6 +437,15 @@ private void addStopStateToLooping(final State 
loopingState) {
untilCondition,
true);
 
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)
 &&
+   times.getFrom() != times.getTo()) {
+   if (untilCondition != null) {
+   State sinkStateCopy = 
copy(sinkState);
+   
originalStateMap.put(sinkState.getName(), sinkStateCopy);
--- End diff --

When are the old entries cleared in this map ?
Shall we consider using map which expires entries by TTL ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4296


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804949
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+ 

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804902
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+ 

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804887
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+ 

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804932
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+ 

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804764
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -637,9 +675,23 @@ private boolean isPatternOptional(Pattern 
pattern) {
untilCondition,
true);
 
-   final IterativeCondition proceedCondition = 
getTrueFunction();
+   IterativeCondition proceedCondition = 
getTrueFunction();
final State loopingState = 
createState(currentPattern.getName(), State.StateType.Normal);
-   loopingState.addProceed(sinkState, proceedCondition);
+
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   State sinkStateCopy = copy(sinkState);
--- End diff --

Make sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131804398
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -386,6 +387,19 @@ private void checkPatternNameUniqueness(final Pattern 
pattern) {
return copyOfSink;
}
 
+   private State copy(final State state) {
+   final State copyOfState = new 
State<>(state.getName(), state.getStateType());
--- End diff --

Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131629516
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131629581
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131629357
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131629322
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,1068 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyZeroOrMore() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyZeroOrMoreConsecutiveEndWithOptional() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131628265
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -386,6 +387,19 @@ private void checkPatternNameUniqueness(final Pattern 
pattern) {
return copyOfSink;
}
 
+   private State copy(final State state) {
+   final State copyOfState = new 
State<>(state.getName(), state.getStateType());
--- End diff --

The copy should be made with `createState` method which does ensure there 
is a unique name assigned, which is neccessary for the serialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-07 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r131628087
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -637,9 +675,23 @@ private boolean isPatternOptional(Pattern 
pattern) {
untilCondition,
true);
 
-   final IterativeCondition proceedCondition = 
getTrueFunction();
+   IterativeCondition proceedCondition = 
getTrueFunction();
final State loopingState = 
createState(currentPattern.getName(), State.StateType.Normal);
-   loopingState.addProceed(sinkState, proceedCondition);
+
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   State sinkStateCopy = copy(sinkState);
--- End diff --

If I understood correctly the copies are needed only in casese where there 
is the `untilCondition`. Am I right? If so let's create the copy then. Right 
know there are dangling copies of the next when there is no `untilCondtion`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-02 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r130883414
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -657,25 +663,34 @@ private boolean isPatternOptional(Pattern 
pattern) {
true);
 
IterativeCondition proceedCondition = 
getTrueFunction();
-   if (currentPattern.getQuantifier().isGreedy()) {
-   proceedCondition = 
getGreedyCondition(proceedCondition, Lists.newArrayList(takeCondition));
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   proceedCondition = getGreedyCondition(
+   proceedCondition,
+   takeCondition,
+   ignoreCondition,
+   followingTakeCondition);;
}
final State loopingState = 
createState(currentPattern.getName(), State.StateType.Normal,
-   currentPattern.getQuantifier().isGreedy());
+   
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
loopingState.addProceed(sinkState, proceedCondition);
loopingState.addTake(takeCondition);
 
addStopStateToLooping(loopingState);
 
if (ignoreCondition != null) {
final State ignoreState = 
createState(currentPattern.getName(), State.StateType.Normal,
-   
currentPattern.getQuantifier().isGreedy());
+   
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
ignoreState.addTake(loopingState, 
takeCondition);
ignoreState.addIgnore(ignoreCondition);
loopingState.addIgnore(ignoreState, 
ignoreCondition);
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
--- End diff --

@dawidwys  Good advice. Thanks a lot :). I have updated the PR per the 
solution you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-08-01 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r130537682
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ---
@@ -657,25 +663,34 @@ private boolean isPatternOptional(Pattern 
pattern) {
true);
 
IterativeCondition proceedCondition = 
getTrueFunction();
-   if (currentPattern.getQuantifier().isGreedy()) {
-   proceedCondition = 
getGreedyCondition(proceedCondition, Lists.newArrayList(takeCondition));
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
+   proceedCondition = getGreedyCondition(
+   proceedCondition,
+   takeCondition,
+   ignoreCondition,
+   followingTakeCondition);;
}
final State loopingState = 
createState(currentPattern.getName(), State.StateType.Normal,
-   currentPattern.getQuantifier().isGreedy());
+   
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
loopingState.addProceed(sinkState, proceedCondition);
loopingState.addTake(takeCondition);
 
addStopStateToLooping(loopingState);
 
if (ignoreCondition != null) {
final State ignoreState = 
createState(currentPattern.getName(), State.StateType.Normal,
-   
currentPattern.getQuantifier().isGreedy());
+   
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY));
ignoreState.addTake(loopingState, 
takeCondition);
ignoreState.addIgnore(ignoreCondition);
loopingState.addIgnore(ignoreState, 
ignoreCondition);
+   if 
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
 {
--- End diff --

I think the problem you mentioned with 
`GreedyITCase.testGreedyZeroOrMoreBeforeOptional2` is due to that proceed. The 
whole reason behind the additional ignoreState was for it not to have the 
`PROCEED` transition as it creates additional computationStates with the 
`sinkState`. I think for the greedy to work we need to put customized 
ignoreCondition into the `sinkState` of a greedy Pattern. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r129832284
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyFollowedBy() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyUtil() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 3.0);
+   Event a3 = new Event(43, "a", 4.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r129830324
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -105,6 +107,14 @@ public void optional() {
properties.add(Quantifier.QuantifierProperty.OPTIONAL);
}
 
+   public void greedy() {
+   greedy = true;
--- End diff --

Make sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-27 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r129830367
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -492,4 +506,10 @@ private void checkIfQuantifierApplied() {
"Current quantifier is: " + quantifier);
}
}
+
+   private void checkIfNoFollowedByAny() {
+   if (quantifier.getConsumingStrategy() == 
ConsumingStrategy.SKIP_TILL_ANY) {
--- End diff --

Make sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r129263762
  
--- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
 ---
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link Pattern#greedy()}.
+ */
+public class GreedyITCase extends TestLogger {
+
+   @Test
+   public void testGreedyFollowedBy() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 2.0);
+   Event a3 = new Event(43, "a", 2.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   Pattern pattern = 
Pattern.begin("start").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("c");
+   }
+   }).followedBy("middle").where(new SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("a");
+   }
+   }).oneOrMore().optional().greedy().followedBy("end").where(new 
SimpleCondition() {
+   private static final long serialVersionUID = 
5726188262756267490L;
+
+   @Override
+   public boolean filter(Event value) throws Exception {
+   return value.getName().equals("d");
+   }
+   });
+
+   NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
+
+   final List resultingPatterns = 
feedNFA(inputEvents, nfa);
+
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(c, a1, a2, a3, d)
+   ));
+   }
+
+   @Test
+   public void testGreedyUtil() {
+   List inputEvents = new ArrayList<>();
+
+   Event c = new Event(40, "c", 1.0);
+   Event a1 = new Event(41, "a", 2.0);
+   Event a2 = new Event(42, "a", 3.0);
+   Event a3 = new Event(43, "a", 4.0);
+   Event d = new Event(44, "d", 3.0);
+
+   inputEvents.add(new StreamRecord<>(c, 1));
+   inputEvents.add(new StreamRecord<>(a1, 2));
+   inputEvents.add(new StreamRecord<>(a2, 3));
+   inputEvents.add(new StreamRecord<>(a3, 4));
+   inputEvents.add(new StreamRecord<>(d, 5));
+
+   // c a* d
+   

[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r129261577
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 ---
@@ -105,6 +107,14 @@ public void optional() {
properties.add(Quantifier.QuantifierProperty.OPTIONAL);
}
 
+   public void greedy() {
+   greedy = true;
--- End diff --

How about applying greedy to SINGLETON state? I think it is illegal 
combination. Also maybe change GREEDY into `QuantifierProperty`? It will 
decrease the number of methods/fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/4296#discussion_r129262270
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 ---
@@ -492,4 +506,10 @@ private void checkIfQuantifierApplied() {
"Current quantifier is: " + quantifier);
}
}
+
+   private void checkIfNoFollowedByAny() {
+   if (quantifier.getConsumingStrategy() == 
ConsumingStrategy.SKIP_TILL_ANY) {
--- End diff --

I think it is a valid combination. I think the InnerConsumingStrategy 
should not be `SKIP_TILL_ANY`.

So `followedByAny("loop").oneOrMore().greedy()` in my opinion is a valid 
one, but `followedByAny("loop").oneOrMore().allowCombinations().greedy()` is 
not. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4296: [FLINK-7147] [cep] Support greedy quantifier in CE...

2017-07-11 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4296

[FLINK-7147] [cep] Support greedy quantifier in CEP

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink greedy_quantifier

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4296.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4296


commit 647d4c58309990d9fc924bc4343ef811dacf4ef1
Author: Dian Fu 
Date:   2017-07-11T08:03:42Z

[FLINK-7147] [cep] Support greedy quantifier in CEP




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---