[jira] [Commented] (FLINK-7280) Wrong clearing SharedBuffer of Equal elements with same Timestamp

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110511#comment-16110511
 ] 

ASF GitHub Bot commented on FLINK-7280:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4406


> Wrong clearing SharedBuffer of Equal elements with same Timestamp
> -
>
> Key: FLINK-7280
> URL: https://issues.apache.org/jira/browse/FLINK-7280
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> Following tests fails right now:
> {code}
> @Test
> public void testClearingBuffer() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event b1 = new Event(41, "b", 2.0);
>   Event c1 = new Event(41, "c", 2.0);
>   Event d = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(b1, 2));
>   inputEvents.add(new StreamRecord<>(c1, 2));
>   inputEvents.add(new StreamRecord<>(d, 2));
>   Pattern pattern = Pattern.begin("a").where(new 
> SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).followedBy("b").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("b");
>   }
>   }).followedBy("c").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("c");
>   }
>   }).followedBy("d").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("d");
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   List> resultingPatterns = feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.>newArrayList(
>   Lists.newArrayList(a1, b1, c1, d)
>   ));
>   assertTrue(nfa.isEmpty());
> }
> {code}
> {code}
> @Test
> public void testClearingBufferWithUntilAtTheEnd() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event d1 = new Event(41, "d", 2.0);
>   Event d2 = new Event(41, "d", 2.0);
>   Event d3 = new Event(41, "d", 2.0);
>   Event d4 = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(d1, 2));
>   inputEvents.add(new StreamRecord<>(d2, 2));
>   inputEvents.add(new StreamRecord<>(d3, 2));
>   inputEvents.add(new StreamRecord<>(d4, 4));
>   Pattern pattern = Pattern.begin("a").where(new 
> SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).followedBy("d").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("d");
>   }
>   }).oneOrMore().until(new IterativeCondition() {
>   @Override
>   public boolean filter(Event value, Context ctx) throws 
> Exception {
>   return 
> Iterators.size(ctx.getEventsForPattern("d").iterator()) == 3;
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   List> resultingPatterns = feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.>newArrayList(
>   Lists.newArrayList(a1, d1, d2, d3),
>   Lists.newArrayList(a1, d1, d2),
>   Lists.newArrayList(a1, d1)
>   ));
>   assertTrue(nfa.isEmpty());
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7280) Wrong clearing SharedBuffer of Equal elements with same Timestamp

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16110504#comment-16110504
 ] 

ASF GitHub Bot commented on FLINK-7280:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4406
  
merging


> Wrong clearing SharedBuffer of Equal elements with same Timestamp
> -
>
> Key: FLINK-7280
> URL: https://issues.apache.org/jira/browse/FLINK-7280
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> Following tests fails right now:
> {code}
> @Test
> public void testClearingBuffer() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event b1 = new Event(41, "b", 2.0);
>   Event c1 = new Event(41, "c", 2.0);
>   Event d = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(b1, 2));
>   inputEvents.add(new StreamRecord<>(c1, 2));
>   inputEvents.add(new StreamRecord<>(d, 2));
>   Pattern pattern = Pattern.begin("a").where(new 
> SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).followedBy("b").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("b");
>   }
>   }).followedBy("c").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("c");
>   }
>   }).followedBy("d").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("d");
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   List> resultingPatterns = feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.>newArrayList(
>   Lists.newArrayList(a1, b1, c1, d)
>   ));
>   assertTrue(nfa.isEmpty());
> }
> {code}
> {code}
> @Test
> public void testClearingBufferWithUntilAtTheEnd() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event d1 = new Event(41, "d", 2.0);
>   Event d2 = new Event(41, "d", 2.0);
>   Event d3 = new Event(41, "d", 2.0);
>   Event d4 = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(d1, 2));
>   inputEvents.add(new StreamRecord<>(d2, 2));
>   inputEvents.add(new StreamRecord<>(d3, 2));
>   inputEvents.add(new StreamRecord<>(d4, 4));
>   Pattern pattern = Pattern.begin("a").where(new 
> SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).followedBy("d").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("d");
>   }
>   }).oneOrMore().until(new IterativeCondition() {
>   @Override
>   public boolean filter(Event value, Context ctx) throws 
> Exception {
>   return 
> Iterators.size(ctx.getEventsForPattern("d").iterator()) == 3;
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   List> resultingPatterns = feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.>newArrayList(
>   Lists.newArrayList(a1, d1, d2, d3),
>   Lists.newArrayList(a1, d1, d2),
>   Lists.newArrayList(a1, d1)
>   ));
>   assertTrue(nfa.isEmpty());
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7280) Wrong clearing SharedBuffer of Equal elements with same Timestamp

2017-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109294#comment-16109294
 ] 

ASF GitHub Bot commented on FLINK-7280:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4406
  
@dawidwys The changes look good! I checked with `oneOrMore()` and 
`zeroOrMore()` to see if we prematurely clean up the state and it seems to be 
ok.

+1 to merge.


> Wrong clearing SharedBuffer of Equal elements with same Timestamp
> -
>
> Key: FLINK-7280
> URL: https://issues.apache.org/jira/browse/FLINK-7280
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> Following tests fails right now:
> {code}
> @Test
> public void testClearingBuffer() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event b1 = new Event(41, "b", 2.0);
>   Event c1 = new Event(41, "c", 2.0);
>   Event d = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(b1, 2));
>   inputEvents.add(new StreamRecord<>(c1, 2));
>   inputEvents.add(new StreamRecord<>(d, 2));
>   Pattern pattern = Pattern.begin("a").where(new 
> SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).followedBy("b").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("b");
>   }
>   }).followedBy("c").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("c");
>   }
>   }).followedBy("d").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("d");
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   List> resultingPatterns = feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.>newArrayList(
>   Lists.newArrayList(a1, b1, c1, d)
>   ));
>   assertTrue(nfa.isEmpty());
> }
> {code}
> {code}
> @Test
> public void testClearingBufferWithUntilAtTheEnd() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event d1 = new Event(41, "d", 2.0);
>   Event d2 = new Event(41, "d", 2.0);
>   Event d3 = new Event(41, "d", 2.0);
>   Event d4 = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(d1, 2));
>   inputEvents.add(new StreamRecord<>(d2, 2));
>   inputEvents.add(new StreamRecord<>(d3, 2));
>   inputEvents.add(new StreamRecord<>(d4, 4));
>   Pattern pattern = Pattern.begin("a").where(new 
> SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).followedBy("d").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("d");
>   }
>   }).oneOrMore().until(new IterativeCondition() {
>   @Override
>   public boolean filter(Event value, Context ctx) throws 
> Exception {
>   return 
> Iterators.size(ctx.getEventsForPattern("d").iterator()) == 3;
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   List> resultingPatterns = feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.>newArrayList(
>   Lists.newArrayList(a1, d1, d2, d3),
>   Lists.newArrayList(a1, d1, d2),
>   Lists.newArrayList(a1, d1)
>   ));
>   assertTrue(nfa.isEmpty());
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7280) Wrong clearing SharedBuffer of Equal elements with same Timestamp

2017-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108562#comment-16108562
 ] 

ASF GitHub Bot commented on FLINK-7280:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4406
  
@dawidwys I will have a look later today!


> Wrong clearing SharedBuffer of Equal elements with same Timestamp
> -
>
> Key: FLINK-7280
> URL: https://issues.apache.org/jira/browse/FLINK-7280
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> Following tests fails right now:
> {code}
> @Test
> public void testClearingBuffer() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event b1 = new Event(41, "b", 2.0);
>   Event c1 = new Event(41, "c", 2.0);
>   Event d = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(b1, 2));
>   inputEvents.add(new StreamRecord<>(c1, 2));
>   inputEvents.add(new StreamRecord<>(d, 2));
>   Pattern pattern = Pattern.begin("a").where(new 
> SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).followedBy("b").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("b");
>   }
>   }).followedBy("c").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("c");
>   }
>   }).followedBy("d").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("d");
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   List> resultingPatterns = feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.>newArrayList(
>   Lists.newArrayList(a1, b1, c1, d)
>   ));
>   assertTrue(nfa.isEmpty());
> }
> {code}
> {code}
> @Test
> public void testClearingBufferWithUntilAtTheEnd() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event d1 = new Event(41, "d", 2.0);
>   Event d2 = new Event(41, "d", 2.0);
>   Event d3 = new Event(41, "d", 2.0);
>   Event d4 = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(d1, 2));
>   inputEvents.add(new StreamRecord<>(d2, 2));
>   inputEvents.add(new StreamRecord<>(d3, 2));
>   inputEvents.add(new StreamRecord<>(d4, 4));
>   Pattern pattern = Pattern.begin("a").where(new 
> SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).followedBy("d").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("d");
>   }
>   }).oneOrMore().until(new IterativeCondition() {
>   @Override
>   public boolean filter(Event value, Context ctx) throws 
> Exception {
>   return 
> Iterators.size(ctx.getEventsForPattern("d").iterator()) == 3;
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   List> resultingPatterns = feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.>newArrayList(
>   Lists.newArrayList(a1, d1, d2, d3),
>   Lists.newArrayList(a1, d1, d2),
>   Lists.newArrayList(a1, d1)
>   ));
>   assertTrue(nfa.isEmpty());
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7280) Wrong clearing SharedBuffer of Equal elements with same Timestamp

2017-07-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16102956#comment-16102956
 ] 

ASF GitHub Bot commented on FLINK-7280:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/4406

[FLINK-7280] Wrong clearing SharedBuffer of Equal elements with same …


## What is the purpose of the change

Fixes a bug that resulted in not clearing the NFA when the matched sequence 
ended with elements of same timestamp.


## Brief change log

- clearing the elements by counter of the newly created computation state 
instead of previous one


## Verifying this change

This change added tests and can be verified as follows:

- Added tests when a sequence ends with elements of same timestamp

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency):  no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink cep-same-elements-bug

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4406.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4406


commit 410822caa2ecd60395570dae535e11909f56810f
Author: Dawid Wysakowicz 
Date:   2017-07-27T08:45:17Z

[FLINK-7280] Wrong clearing SharedBuffer of Equal elements with same 
Timestamp




> Wrong clearing SharedBuffer of Equal elements with same Timestamp
> -
>
> Key: FLINK-7280
> URL: https://issues.apache.org/jira/browse/FLINK-7280
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.4.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> Following tests fails right now:
> {code}
> @Test
> public void testClearingBuffer() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event b1 = new Event(41, "b", 2.0);
>   Event c1 = new Event(41, "c", 2.0);
>   Event d = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(b1, 2));
>   inputEvents.add(new StreamRecord<>(c1, 2));
>   inputEvents.add(new StreamRecord<>(d, 2));
>   Pattern pattern = Pattern.begin("a").where(new 
> SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("a");
>   }
>   }).followedBy("b").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("b");
>   }
>   }).followedBy("c").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("c");
>   }
>   }).followedBy("d").where(new SimpleCondition() {
>   @Override
>   public boolean filter(Event value) throws Exception {
>   return value.getName().equals("d");
>   }
>   });
>   NFA nfa = NFACompiler.compile(pattern, 
> Event.createTypeSerializer(), false);
>   List> resultingPatterns = feedNFA(inputEvents, nfa);
>   compareMaps(resultingPatterns, Lists.>newArrayList(
>   Lists.newArrayList(a1, b1, c1, d)
>   ));
>   assertTrue(nfa.isEmpty());
> }
> {code}
> {code}
> @Test
> public void testClearingBufferWithUntilAtTheEnd() throws Exception {
>   List> inputEvents = new ArrayList<>();
>   Event a1 = new Event(40, "a", 1.0);
>   Event d1 = new Event(41, "d", 2.0);
>   Event d2 = new Event(41, "d", 2.0);
>   Event d3 = new Event(41, "d", 2.0);
>   Event d4 = new Event(41, "d", 2.0);
>   inputEvents.add(new StreamRecord<>(a1, 1));
>   inputEvents.add(new StreamRecord<>(d1, 2));
>   inputEvents.add(new StreamRecord<>(d2, 2));
>   inputEvents.add(new StreamRecord<>(d3, 2));
>   inputEvents