[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-19 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585165#comment-16585165
 ] 

Chesnay Schepler commented on FLINK-9642:
-

[~dawidwys] Can you give this issue a better name?

> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582664#comment-16582664
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-413578231
 
 
   Sorry for slow response, I will try to have a final look next week, but from 
what I've seen so far it looks quite well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582654#comment-16582654
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-413577032
 
 
   Hi, @dawidwys  can you help review again, 3Q ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577596#comment-16577596
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-412344931
 
 
   The failed travis ci seems unrelated with this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577207#comment-16577207
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-412280321
 
 
   Have fixed the place you pointed, and add a test to cover the element in 
state and cache's change. Please take a look again, thx. @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577090#comment-16577090
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209419821
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
+   return new SharedBufferAccessor<>(this);
+   }
 
-   if (previousNodeId != null) {
-   lockNode(previousNodeId);
+   public void advanceTime(long timestamp) throws Exception {
 
 Review comment:
   This need to be called in nfa package, I think it should be public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577089#comment-16577089
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209419821
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
+   return new SharedBufferAccessor<>(this);
+   }
 
-   if (previousNodeId != null) {
-   lockNode(previousNodeId);
+   public void advanceTime(long timestamp) throws Exception {
 
 Review comment:
   This need to be called in nfa package, I think it should be public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576269#comment-16576269
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209251987
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576268#comment-16576268
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209249252
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
 
 Review comment:
   Check the cache first. In case there is sth in the cache we won't need to 
access the state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576254#comment-16576254
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209234513
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576262#comment-16576262
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209248594
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576266#comment-16576266
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209234729
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576252#comment-16576252
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209233907
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576249#comment-16576249
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209233807
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576259#comment-16576259
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209248515
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576261#comment-16576261
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209251047
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576250#comment-16576250
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209233134
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576270#comment-16576270
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209249647
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -25,58 +25,37 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.DeweyNumber;
-import org.apache.flink.util.WrappingRuntimeException;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
-import org.apache.commons.lang3.StringUtils;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Stack;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
- * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
- * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
- *
- * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
- * we do not need to deserialize events during processing and we store only 
one copy of the event.
- *
- * The entries in {@link SharedBuffer} are {@link SharedBufferNode}. The 
shared buffer node allows to store
- * relations between different entries. A dewey versioning scheme allows to 
discriminate between
- * different relations (e.g. preceding element).
- *
- * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
- *
- * @param  Type of the values
- * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
- * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ * This is a helper class of {@link SharedBufferAccessor}. It do the cache of 
the underlay sharedBuffer state
+ * during a nfa process. It can reduce the state access when the ref change is 
requested several times on
+ * a same {@code Lockable} Object. And it also implements the {@code 
AutoCloseable} interface to flush the
 
 Review comment:
   Outdated javadoc. It is no longer Autoclosable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576245#comment-16576245
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209232519
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
 
 Review comment:
   This method is unnecessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576257#comment-16576257
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209248308
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576263#comment-16576263
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209248405
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576255#comment-16576255
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209252113
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576248#comment-16576248
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209233765
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576251#comment-16576251
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209233285
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576265#comment-16576265
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209249984
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
 
 Review comment:
   Let's move this description to `SharedBuffer` and replace it with sth along 
the lines: Accessor to SharedBuffer that allows operations on the underlying 
structures in batches. Operations are persisted only after closing the Accessor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576264#comment-16576264
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209253795
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
+   return new SharedBufferAccessor<>(this);
+   }
 
-   if (previousNodeId != null) {
-   lockNode(previousNodeId);
+   public void advanceTime(long timestamp) throws Exception {
 
 Review comment:
   This could be package protected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576256#comment-16576256
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209250690
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576253#comment-16576253
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209235080
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ##
 @@ -373,10 +373,11 @@ private void updateNFA(NFAState nfaState) throws 
IOException {
 * @param nfaState Our NFAState object
 * @param event The current event to be processed
 * @param timestamp The timestamp of the event
+* @param sharedBuffer The sharedBuffer of the process
 */
-   private void processEvent(NFAState nfaState, IN event, long timestamp) 
throws Exception {
+   private void processEvent(NFAState nfaState, IN event, long timestamp, 
SharedBuffer sharedBuffer) throws Exception {
 
 Review comment:
   I see no reason for this change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576246#comment-16576246
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209232912
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
 
 Review comment:
   Let's not call it cache. It is a fault-tolerant structure. It is the 
`SharedBuffer` responsibility that it does the caching.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576258#comment-16576258
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209248958
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -204,227 +133,78 @@ public NodeId put(
 * @throws Exception Thrown if the system cannot access the state.
 */
public boolean isEmpty() throws Exception {
-   return Iterables.isEmpty(eventsBuffer.keys());
+   return Iterables.isEmpty(eventsBuffer.keys()) && 
Iterables.isEmpty(eventsBufferCache.keySet());
}
 
/**
-* Returns all elements from the previous relation starting at the 
given entry.
-*
-* @param nodeId  id of the starting entry
-* @param version Version of the previous relation which shall be 
extracted
-* @return Collection of previous relations starting with the given 
value
-* @throws Exception Thrown if the system cannot access the state.
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
 */
-   public List>> extractPatterns(
-   final NodeId nodeId,
-   final DeweyNumber version) throws Exception {
-
-   List>> result = new ArrayList<>();
-
-   // stack to remember the current extraction states
-   Stack extractionStates = new Stack<>();
-
-   // get the starting shared buffer entry for the previous 
relation
-   Lockable entryLock = entries.get(nodeId);
-
-   if (entryLock != null) {
-   SharedBufferNode entry = entryLock.getElement();
-   extractionStates.add(new 
ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-   // use a depth first search to reconstruct the previous 
relations
-   while (!extractionStates.isEmpty()) {
-   final ExtractionState extractionState = 
extractionStates.pop();
-   // current path of the depth first search
-   final Stack> 
currentPath = extractionState.getPath();
-   final Tuple2 
currentEntry = extractionState.getEntry();
-
-   // termination criterion
-   if (currentEntry == null) {
-   final Map> 
completePath = new LinkedHashMap<>();
-
-   while (!currentPath.isEmpty()) {
-   final NodeId currentPathEntry = 
currentPath.pop().f0;
-
-   String page = 
currentPathEntry.getPageName();
-   List values = 
completePath
-   .computeIfAbsent(page, 
k -> new ArrayList<>());
-   
values.add(currentPathEntry.getEventId());
-   }
-   result.add(completePath);
-   } else {
-
-   // append state to the path
-   currentPath.push(currentEntry);
-
-   boolean firstMatch = true;
-   for (SharedBufferEdge edge : 
currentEntry.f1.getEdges()) {
-   // we can only proceed if the 
current version is compatible to the version
-   // of this previous relation
-   final DeweyNumber 
currentVersion = extractionState.getVersion();
-   if 
(currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-   final NodeId target = 
edge.getTarget();
-   Stack> newPath;
-
-   if (firstMatch) {
-   // for the 
first match we don't have to copy the current path
-   newPath = 
currentPath;
-   firstMatch = 
false;
-   } else {
-   

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576267#comment-16576267
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209249390
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
 
 Review comment:
   Missing javadoc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576260#comment-16576260
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209234589
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   public void setSharedBuffer(SharedBuffer sharedBuffer) {
+   this.sharedBuffer = sharedBuffer;
+   }
+
+   /**
+* Notifies shared buffer that there will be no events with timestamp 
 the given value. It allows to clear
+* internal counters for number of events seen so far per timestamp.
+*
+* @param timestamp watermark, no earlier events will arrive
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void advanceTime(long timestamp) throws Exception {
+   sharedBuffer.advanceTime(timestamp);
+   }
+
+   /**
+* Adds another unique event to the shared buffer and assigns a unique 
id for it. It automatically creates a
+* lock on this event, so it won't be removed during processing of that 
event. Therefore the lock should be removed
+* after processing all {@link 
org.apache.flink.cep.nfa.ComputationState}s
+*
+* NOTE:Should be called only once for each unique event!
+*
+* @param value event to be registered
+* @return unique id of that event that should be used when putting 
entries to the buffer.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public EventId registerEvent(V value, long timestamp) throws Exception {
+   return sharedBuffer.registerEvent(value, timestamp);
+   }
+
+   /**
+* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
+* relation to the previous entry.
+*
+* @param stateName  name of the state that the 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576247#comment-16576247
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209232579
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 ##
 @@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static 
org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. 
Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in 
the buffer.
+ *
+ * The idea of the implementation is to have a buffer for incoming events 
with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only 
one copy of the event.
+ *
+ * The entries in {@link SharedBufferAccessor} are {@link 
SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to 
discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * The implementation is strongly based on the paper "Efficient Pattern 
Matching over Event Streams".
+ *
+ * @param  Type of the values
+ * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;>
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf
+ */
+public class SharedBufferAccessor implements AutoCloseable{
+
+   /** The cache of sharedBuffer.*/
+   private SharedBuffer sharedBuffer;
+
+   public SharedBufferAccessor(SharedBuffer sharedBuffer) {
 
 Review comment:
   This ctor could be package-protected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575524#comment-16575524
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-411923422
 
 
   Thanks a lot for your review. I have chaned as your mentioned, please take 
a look then, thx @dawidwys  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574958#comment-16574958
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

dawidwys commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-411788642
 
 
   Hi @Aitozi 
   I found the dependencies between accessor and shared buffer a bit too much 
both sided. Please have a look at my idea how would I loosen it a bit in this 
branch: https://github.com/dawidwys/flink/tree/pr/6205. It does not compile, 
but you should get the idea how would I split up those two classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16565122#comment-16565122
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-409534657
 
 
   ping @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-07-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561174#comment-16561174
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-408690962
 
 
   A little busy these day and delayed this PR, Now I have separate 
`SharedBuffer` to 
   - `SharedBuffer` to deal with the NFA query 
   - `SharedBufferAccessor` to deal with the underlay `State` directly
   
   Please take a look again @dawidwys ,thx.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540114#comment-16540114
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6205
  
Hi @Aitozi , 
Thanks for the work here. I think we could improve a bit separation of 
concerns in the SharedBuffer. I am a bit afraid this class will become too 
complex once again. How about we split the SharedBuffer into two layers: 
caching one(SharedBuffer) and accessing buffer e.g. `SharedBufferAccessor`.  We 
could make the accessor `AutoCloseable`. I think it would give more explicit 
information about the necessity to flush the buffer.

We could move to `SharedBufferAccessor` methods like:
- registerEvent
- put
- extractPatterns
- materializeMatch
- materializeMatch
- lockNode
- releaseNode
- removeNode
- lockEvent
- releaseEvent

In the `SharedBuffer` we would leave only the caching package-protected 
methods. We would also add a method there to get the `SharedBufferAccessor` 
that would `flush` the changes on `close`. This way we would have a cleaner 
separation, plus we would make the flushing more intuitive.

What do you think?


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540115#comment-16540115
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6205
  
Hi @Aitozi , 
Thanks for the work here. I think we could improve a bit separation of 
concerns in the SharedBuffer. I am a bit afraid this class will become too 
complex once again. How about we split the SharedBuffer into two layers: 
caching one(SharedBuffer) and accessing buffer e.g. `SharedBufferAccessor`.  We 
could make the accessor `AutoCloseable`. I think it would give more explicit 
information about the necessity to flush the buffer.

We could move to `SharedBufferAccessor` methods like:
- registerEvent
- put
- extractPatterns
- materializeMatch
- materializeMatch
- lockNode
- releaseNode
- removeNode
- lockEvent
- releaseEvent

In the `SharedBuffer` we would leave only the caching package-protected 
methods. We would also add a method there to get the `SharedBufferAccessor` 
that would `flush` the changes on `close`. This way we would have a cleaner 
separation, plus we would make the flushing more intuitive.

What do you think?


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-07-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539498#comment-16539498
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Resolved the conflicts,  please help review when you free @dawidwys .


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-07-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535986#comment-16535986
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6205
  
Hi,  @Aitozi  I think it has conflicts now.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-07-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535553#comment-16535553
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Could you take a look at this PR @dawidwys ?


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524462#comment-16524462
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Is this  look ok now? ping @sihuazhou @dawidwys 


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523794#comment-16523794
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
fixed the error that the access to state increased in `NFAStateAccessTest` 
by add the `isEmpty` judgment before update the state.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523771#comment-16523771
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Using the `entries#putAll` in `flushCache` lead to the count in 
`NFAStateAccessTest` increased,  I will check it locally , this travis will 
fail.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523754#comment-16523754
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198151182
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

Get it.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523733#comment-16523733
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198146934
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

In fact, what I means is that if you want to throw an exception here, you 
could throw the exception as `throw new RuntimeException("exception message", 
originalException)`, this way the original exception won't be swallowed.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by 

[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523727#comment-16523727
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198146056
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

Yes.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523721#comment-16523721
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198143091
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

But I don't know how to deal with the exception in a stream api in java8, 
do you have a better way to deal with this situation? thanks.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523718#comment-16523718
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198142501
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

Ok, Is this benefit from the `RocksDBWriteBatchWrapper` when use the 
`putAll`?


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523713#comment-16523713
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198135717
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

I would suggest to use the `entries.putAll()` instead, since it could get a 
better performance when your are using RocksDB backend.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523712#comment-16523712
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198136425
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

Same here, do not swallow the original Exception.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523714#comment-16523714
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198135796
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
--- End diff --

Same here.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523711#comment-16523711
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198136431
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

I would suggest to not swallow the original Exception here.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521405#comment-16521405
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
The travis error this time seems unrelated.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521386#comment-16521386
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629310
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
--- End diff --

fixed


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521384#comment-16521384
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6205
  
Hi @zhangminglei ,  thanks for your review. I only check the 
SharedBufferTest locally before, the error in travis means the num of state 
access (read and write) is less than before which is the purpose of this pr, 
and I fix the error. cc @dawidwys 


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521383#comment-16521383
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629304
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
--- End diff --

Yes. Thanks let me know this.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521380#comment-16521380
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629205
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
--- End diff --

it means `if and only if`.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521381#comment-16521381
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197629218
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -75,6 +76,9 @@
private MapState eventsCount;
private MapState> entries;
 
+   private HashMap> eventsBufferCache = new 
HashMap<>();
+   private HashMap> entryCache = new 
HashMap<>();
--- End diff --

agree and fixed


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521365#comment-16521365
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/6205
  
Hi, @Aitozi Could you please take a look on the travis error 

```
Tests run: 2, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 0.014 sec 
<<< FAILURE! - in org.apache.flink.cep.nfa.NFAStateAccessTest
testIterativeWithABACPattern(org.apache.flink.cep.nfa.NFAStateAccessTest)  
Time elapsed: 0.009 sec  <<< FAILURE!
java.lang.AssertionError: expected:<20> but was:<8>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.flink.cep.nfa.NFAStateAccessTest.testIterativeWithABACPattern(NFAStateAccessTest.java:193)


testComplexBranchingAfterZeroOrMore(org.apache.flink.cep.nfa.NFAStateAccessTest)
  Time elapsed: 0.005 sec  <<< FAILURE!
java.lang.AssertionError: expected:<5> but was:<2>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.flink.cep.nfa.NFAStateAccessTest.testComplexBranchingAfterZeroOrMore(NFAStateAccessTest.java:112)

Running org.apache.flink.cep.nfa.compiler.NFACompilerTest
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.019 sec - 
in org.apache.flink.cep.nfa.compiler.NFACompilerTest
Running org.apache.flink.cep.nfa.DeweyNumberTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.01 sec - 
in org.apache.flink.cep.nfa.DeweyNumberTest
Running org.apache.flink.cep.nfa.sharedbuffer.SharedBufferTest
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.004 sec - 
in org.apache.flink.cep.nfa.sharedbuffer.SharedBufferTest
Running org.apache.flink.cep.pattern.PatternTest
Tests run: 22, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.033 sec 
- in org.apache.flink.cep.pattern.PatternTest

Results :

Failed tests: 
  NFAStateAccessTest.testComplexBranchingAfterZeroOrMore:112 expected:<5> 
but was:<2>
  NFAStateAccessTest.testIterativeWithABACPattern:193 expected:<20> but 
was:<8>

Tests run: 80, Failures: 2, Errors: 0, Skipped: 12

13:21:32.852 [INFO] 

13:21:32.852 [INFO] Reactor Summary:
13:21:32.852 [INFO] 
13:21:32.852 [INFO] flink-queryable-state-client-java .. 
SUCCESS [  4.830 s]
13:21:32.852 [INFO] flink-cep .. 
FAILURE [  5.861 s]
13:21:32.852 [INFO] flink-table  
SKIPPED
13:21:32.852 [INFO] flink-queryable-state-runtime .. 
SKIPPED
13:21:32.852 [INFO] flink-gelly  
SKIPPED
13:21:32.852 [INFO] flink-gelly-scala .. 
SKIPPED
13:21:32.852 [INFO] flink-gelly-examples ... 
SKIPPED
13:21:32.852 [INFO] flink-python ... 
SKIPPED
13:21:32.852 [INFO] flink-ml ... 
SKIPPED
13:21:32.852 [INFO] flink-cep-scala  
SKIPPED
```


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521362#comment-16521362
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197627451
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -75,6 +76,9 @@
private MapState eventsCount;
private MapState> entries;
 
+   private HashMap> eventsBufferCache = new 
HashMap<>();
+   private HashMap> entryCache = new 
HashMap<>();
--- End diff --

I would suggest change it to 

`private Map> eventsBufferCache = new HashMap<>();`


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521364#comment-16521364
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197627422
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
--- End diff --

iff ? I think it should be if.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521363#comment-16521363
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r197627410
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put a event to cache.
+* @param eventId id of the event
--- End diff --

Put an event.


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-06-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521092#comment-16521092
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/6205

[FLINK-9642]Reduce the count to deal with state during a CEP process

## What is the purpose of the change

With the rework of sharedBuffer Flink-9418, the lock & release operation is 
deal with state backend which is different from the previous version which will 
read the state of sharedBuffer all to memory, i think we can add a cache or 
variable in sharedbuffer to cache the LockAble Object to mark the ref change in 
once process in NFA, this will reduce the count when the events point to the 
same Node. And flush the result to MapState at the end of process. 


## Brief change log

- add the eventsBufferCache and entryCache
- flush the cache after one turn process 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink onceQueryCache

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6205


commit 184ec24474ee5b8a1c9f932286ef4aed4f1dabd6
Author: minwenjun 
Date:   2018-06-23T12:56:55Z

[FLINK-9642]Reduce the count to deal with state during a CEP process




> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)