[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16585165#comment-16585165 ] Chesnay Schepler commented on FLINK-9642: - [~dawidwys] Can you give this issue a better name? > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582664#comment-16582664 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-413578231 Sorry for slow response, I will try to have a final look next week, but from what I've seen so far it looks quite well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16582654#comment-16582654 ] ASF GitHub Bot commented on FLINK-9642: --- Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-413577032 Hi, @dawidwys can you help review again, 3Q ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577596#comment-16577596 ] ASF GitHub Bot commented on FLINK-9642: --- Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-412344931 The failed travis ci seems unrelated with this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577207#comment-16577207 ] ASF GitHub Bot commented on FLINK-9642: --- Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-412280321 Have fixed the place you pointed, and add a test to cover the element in state and cache's change. Please take a look again, thx. @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577090#comment-16577090 ] ASF GitHub Bot commented on FLINK-9642: --- Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209419821 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -161,40 +100,30 @@ public void init( eventsCount.putAll(maxIds); } - /** -* Stores given value (value + timestamp) under the given state. It assigns a preceding element -* relation to the previous entry. -* -* @param stateName name of the state that the event should be assigned to -* @param eventIdunique id of event assigned by this SharedBuffer -* @param previousNodeId id of previous entry (might be null if start of new run) -* @param versionVersion of the previous relation -* @return assigned id of this element -* @throws Exception Thrown if the system cannot access the state. -*/ - public NodeId put( - final String stateName, - final EventId eventId, - @Nullable final NodeId previousNodeId, - final DeweyNumber version) throws Exception { + public SharedBufferAccessor getAccessor() { + return new SharedBufferAccessor<>(this); + } - if (previousNodeId != null) { - lockNode(previousNodeId); + public void advanceTime(long timestamp) throws Exception { Review comment: This need to be called in nfa package, I think it should be public. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577089#comment-16577089 ] ASF GitHub Bot commented on FLINK-9642: --- Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209419821 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -161,40 +100,30 @@ public void init( eventsCount.putAll(maxIds); } - /** -* Stores given value (value + timestamp) under the given state. It assigns a preceding element -* relation to the previous entry. -* -* @param stateName name of the state that the event should be assigned to -* @param eventIdunique id of event assigned by this SharedBuffer -* @param previousNodeId id of previous entry (might be null if start of new run) -* @param versionVersion of the previous relation -* @return assigned id of this element -* @throws Exception Thrown if the system cannot access the state. -*/ - public NodeId put( - final String stateName, - final EventId eventId, - @Nullable final NodeId previousNodeId, - final DeweyNumber version) throws Exception { + public SharedBufferAccessor getAccessor() { + return new SharedBufferAccessor<>(this); + } - if (previousNodeId != null) { - lockNode(previousNodeId); + public void advanceTime(long timestamp) throws Exception { Review comment: This need to be called in nfa package, I think it should be public. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576269#comment-16576269 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209251987 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576268#comment-16576268 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209249252 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); Review comment: Check the cache first. In case there is sth in the cache we won't need to access the state. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576254#comment-16576254 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209234513 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576262#comment-16576262 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209248594 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576266#comment-16576266 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209234729 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576252#comment-16576252 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209233907 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576249#comment-16576249 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209233807 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576259#comment-16576259 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209248515 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576261#comment-16576261 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209251047 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576250#comment-16576250 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209233134 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576270#comment-16576270 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209249647 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -25,58 +25,37 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.cep.nfa.DeweyNumber; -import org.apache.flink.util.WrappingRuntimeException; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; -import org.apache.commons.lang3.StringUtils; - -import javax.annotation.Nullable; - -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import java.util.Stack; import java.util.stream.Collectors; -import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; -import static org.apache.flink.util.Preconditions.checkState; - /** - * A shared buffer implementation which stores values under according state. Additionally, the values can be - * versioned such that it is possible to retrieve their predecessor element in the buffer. - * - * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way - * we do not need to deserialize events during processing and we store only one copy of the event. - * - * The entries in {@link SharedBuffer} are {@link SharedBufferNode}. The shared buffer node allows to store - * relations between different entries. A dewey versioning scheme allows to discriminate between - * different relations (e.g. preceding element). - * - * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". - * - * @param Type of the values - * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> - * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + * This is a helper class of {@link SharedBufferAccessor}. It do the cache of the underlay sharedBuffer state + * during a nfa process. It can reduce the state access when the ref change is requested several times on + * a same {@code Lockable} Object. And it also implements the {@code AutoCloseable} interface to flush the Review comment: Outdated javadoc. It is no longer Autoclosable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576245#comment-16576245 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209232519 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { Review comment: This method is unnecessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576257#comment-16576257 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209248308 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576263#comment-16576263 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209248405 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576255#comment-16576255 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209252113 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576248#comment-16576248 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209233765 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576251#comment-16576251 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209233285 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576265#comment-16576265 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209249984 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ Review comment: Let's move this description to `SharedBuffer` and replace it with sth along the lines: Accessor to SharedBuffer that allows operations on the underlying structures in batches. Operations are persisted only after closing the Accessor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576264#comment-16576264 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209253795 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -161,40 +100,30 @@ public void init( eventsCount.putAll(maxIds); } - /** -* Stores given value (value + timestamp) under the given state. It assigns a preceding element -* relation to the previous entry. -* -* @param stateName name of the state that the event should be assigned to -* @param eventIdunique id of event assigned by this SharedBuffer -* @param previousNodeId id of previous entry (might be null if start of new run) -* @param versionVersion of the previous relation -* @return assigned id of this element -* @throws Exception Thrown if the system cannot access the state. -*/ - public NodeId put( - final String stateName, - final EventId eventId, - @Nullable final NodeId previousNodeId, - final DeweyNumber version) throws Exception { + public SharedBufferAccessor getAccessor() { + return new SharedBufferAccessor<>(this); + } - if (previousNodeId != null) { - lockNode(previousNodeId); + public void advanceTime(long timestamp) throws Exception { Review comment: This could be package protected. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576256#comment-16576256 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209250690 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576253#comment-16576253 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209235080 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ## @@ -373,10 +373,11 @@ private void updateNFA(NFAState nfaState) throws IOException { * @param nfaState Our NFAState object * @param event The current event to be processed * @param timestamp The timestamp of the event +* @param sharedBuffer The sharedBuffer of the process */ - private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception { + private void processEvent(NFAState nfaState, IN event, long timestamp, SharedBuffer sharedBuffer) throws Exception { Review comment: I see no reason for this change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576246#comment-16576246 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209232912 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ Review comment: Let's not call it cache. It is a fault-tolerant structure. It is the `SharedBuffer` responsibility that it does the caching. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576258#comment-16576258 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209248958 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -204,227 +133,78 @@ public NodeId put( * @throws Exception Thrown if the system cannot access the state. */ public boolean isEmpty() throws Exception { - return Iterables.isEmpty(eventsBuffer.keys()); + return Iterables.isEmpty(eventsBuffer.keys()) && Iterables.isEmpty(eventsBufferCache.keySet()); } /** -* Returns all elements from the previous relation starting at the given entry. -* -* @param nodeId id of the starting entry -* @param version Version of the previous relation which shall be extracted -* @return Collection of previous relations starting with the given value -* @throws Exception Thrown if the system cannot access the state. +* Put an event to cache. +* @param eventId id of the event +* @param event event body */ - public List>> extractPatterns( - final NodeId nodeId, - final DeweyNumber version) throws Exception { - - List>> result = new ArrayList<>(); - - // stack to remember the current extraction states - Stack extractionStates = new Stack<>(); - - // get the starting shared buffer entry for the previous relation - Lockable entryLock = entries.get(nodeId); - - if (entryLock != null) { - SharedBufferNode entry = entryLock.getElement(); - extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>())); - - // use a depth first search to reconstruct the previous relations - while (!extractionStates.isEmpty()) { - final ExtractionState extractionState = extractionStates.pop(); - // current path of the depth first search - final Stack> currentPath = extractionState.getPath(); - final Tuple2 currentEntry = extractionState.getEntry(); - - // termination criterion - if (currentEntry == null) { - final Map> completePath = new LinkedHashMap<>(); - - while (!currentPath.isEmpty()) { - final NodeId currentPathEntry = currentPath.pop().f0; - - String page = currentPathEntry.getPageName(); - List values = completePath - .computeIfAbsent(page, k -> new ArrayList<>()); - values.add(currentPathEntry.getEventId()); - } - result.add(completePath); - } else { - - // append state to the path - currentPath.push(currentEntry); - - boolean firstMatch = true; - for (SharedBufferEdge edge : currentEntry.f1.getEdges()) { - // we can only proceed if the current version is compatible to the version - // of this previous relation - final DeweyNumber currentVersion = extractionState.getVersion(); - if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) { - final NodeId target = edge.getTarget(); - Stack> newPath; - - if (firstMatch) { - // for the first match we don't have to copy the current path - newPath = currentPath; - firstMatch = false; - } else { -
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576267#comment-16576267 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209249390 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java ## @@ -161,40 +100,30 @@ public void init( eventsCount.putAll(maxIds); } - /** -* Stores given value (value + timestamp) under the given state. It assigns a preceding element -* relation to the previous entry. -* -* @param stateName name of the state that the event should be assigned to -* @param eventIdunique id of event assigned by this SharedBuffer -* @param previousNodeId id of previous entry (might be null if start of new run) -* @param versionVersion of the previous relation -* @return assigned id of this element -* @throws Exception Thrown if the system cannot access the state. -*/ - public NodeId put( - final String stateName, - final EventId eventId, - @Nullable final NodeId previousNodeId, - final DeweyNumber version) throws Exception { + public SharedBufferAccessor getAccessor() { Review comment: Missing javadoc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576260#comment-16576260 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209234589 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + public void setSharedBuffer(SharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + /** +* Notifies shared buffer that there will be no events with timestamp the given value. It allows to clear +* internal counters for number of events seen so far per timestamp. +* +* @param timestamp watermark, no earlier events will arrive +* @throws Exception Thrown if the system cannot access the state. +*/ + public void advanceTime(long timestamp) throws Exception { + sharedBuffer.advanceTime(timestamp); + } + + /** +* Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a +* lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed +* after processing all {@link org.apache.flink.cep.nfa.ComputationState}s +* +* NOTE:Should be called only once for each unique event! +* +* @param value event to be registered +* @return unique id of that event that should be used when putting entries to the buffer. +* @throws Exception Thrown if the system cannot access the state. +*/ + public EventId registerEvent(V value, long timestamp) throws Exception { + return sharedBuffer.registerEvent(value, timestamp); + } + + /** +* Stores given value (value + timestamp) under the given state. It assigns a preceding element +* relation to the previous entry. +* +* @param stateName name of the state that the
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576247#comment-16576247 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#discussion_r209232579 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.sharedbuffer; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.nfa.DeweyNumber; +import org.apache.flink.util.WrappingRuntimeException; + +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A shared buffer implementation which stores values under according state. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * + * The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way + * we do not need to deserialize events during processing and we store only one copy of the event. + * + * The entries in {@link SharedBufferAccessor} are {@link SharedBufferNode}. The shared buffer node allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @param Type of the values + * @see https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf;> + * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf + */ +public class SharedBufferAccessor implements AutoCloseable{ + + /** The cache of sharedBuffer.*/ + private SharedBuffer sharedBuffer; + + public SharedBufferAccessor(SharedBuffer sharedBuffer) { Review comment: This ctor could be package-protected. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575524#comment-16575524 ] ASF GitHub Bot commented on FLINK-9642: --- Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-411923422 Thanks a lot for your review. I have chaned as your mentioned, please take a look then, thx @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574958#comment-16574958 ] ASF GitHub Bot commented on FLINK-9642: --- dawidwys commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-411788642 Hi @Aitozi I found the dependencies between accessor and shared buffer a bit too much both sided. Please have a look at my idea how would I loosen it a bit in this branch: https://github.com/dawidwys/flink/tree/pr/6205. It does not compile, but you should get the idea how would I split up those two classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16565122#comment-16565122 ] ASF GitHub Bot commented on FLINK-9642: --- Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-409534657 ping @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561174#comment-16561174 ] ASF GitHub Bot commented on FLINK-9642: --- Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process URL: https://github.com/apache/flink/pull/6205#issuecomment-408690962 A little busy these day and delayed this PR, Now I have separate `SharedBuffer` to - `SharedBuffer` to deal with the NFA query - `SharedBufferAccessor` to deal with the underlay `State` directly Please take a look again @dawidwys ,thx. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540114#comment-16540114 ] ASF GitHub Bot commented on FLINK-9642: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6205 Hi @Aitozi , Thanks for the work here. I think we could improve a bit separation of concerns in the SharedBuffer. I am a bit afraid this class will become too complex once again. How about we split the SharedBuffer into two layers: caching one(SharedBuffer) and accessing buffer e.g. `SharedBufferAccessor`. We could make the accessor `AutoCloseable`. I think it would give more explicit information about the necessity to flush the buffer. We could move to `SharedBufferAccessor` methods like: - registerEvent - put - extractPatterns - materializeMatch - materializeMatch - lockNode - releaseNode - removeNode - lockEvent - releaseEvent In the `SharedBuffer` we would leave only the caching package-protected methods. We would also add a method there to get the `SharedBufferAccessor` that would `flush` the changes on `close`. This way we would have a cleaner separation, plus we would make the flushing more intuitive. What do you think? > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540115#comment-16540115 ] ASF GitHub Bot commented on FLINK-9642: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6205 Hi @Aitozi , Thanks for the work here. I think we could improve a bit separation of concerns in the SharedBuffer. I am a bit afraid this class will become too complex once again. How about we split the SharedBuffer into two layers: caching one(SharedBuffer) and accessing buffer e.g. `SharedBufferAccessor`. We could make the accessor `AutoCloseable`. I think it would give more explicit information about the necessity to flush the buffer. We could move to `SharedBufferAccessor` methods like: - registerEvent - put - extractPatterns - materializeMatch - materializeMatch - lockNode - releaseNode - removeNode - lockEvent - releaseEvent In the `SharedBuffer` we would leave only the caching package-protected methods. We would also add a method there to get the `SharedBufferAccessor` that would `flush` the changes on `close`. This way we would have a cleaner separation, plus we would make the flushing more intuitive. What do you think? > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539498#comment-16539498 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Resolved the conflicts, please help review when you free @dawidwys . > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535986#comment-16535986 ] ASF GitHub Bot commented on FLINK-9642: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6205 Hi, @Aitozi I think it has conflicts now. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535553#comment-16535553 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Could you take a look at this PR @dawidwys ? > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524462#comment-16524462 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Is this look ok now? ping @sihuazhou @dawidwys > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523794#comment-16523794 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 fixed the error that the access to state increased in `NFAStateAccessTest` by add the `isEmpty` judgment before update the state. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523771#comment-16523771 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Using the `entries#putAll` in `flushCache` lead to the count in `NFAStateAccessTest` increased, I will check it locally , this travis will fail. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523754#comment-16523754 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198151182 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { + try { + eventsBuffer.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- Get it. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523733#comment-16523733 ] ASF GitHub Bot commented on FLINK-9642: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198146934 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { + try { + eventsBuffer.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- In fact, what I means is that if you want to throw an exception here, you could throw the exception as `throw new RuntimeException("exception message", originalException)`, this way the original exception won't be swallowed. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523727#comment-16523727 ] ASF GitHub Bot commented on FLINK-9642: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198146056 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { --- End diff -- Yes. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523721#comment-16523721 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198143091 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { + try { + eventsBuffer.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- But I don't know how to deal with the exception in a stream api in java8, do you have a better way to deal with this situation? thanks. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523718#comment-16523718 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198142501 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { --- End diff -- Ok, Is this benefit from the `RocksDBWriteBatchWrapper` when use the `putAll`? > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523713#comment-16523713 ] ASF GitHub Bot commented on FLINK-9642: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198135717 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { --- End diff -- I would suggest to use the `entries.putAll()` instead, since it could get a better performance when your are using RocksDB backend. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523712#comment-16523712 ] ASF GitHub Bot commented on FLINK-9642: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198136425 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { + try { + eventsBuffer.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- Same here, do not swallow the original Exception. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523714#comment-16523714 ] ASF GitHub Bot commented on FLINK-9642: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198135796 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); + } + } + ); + + eventsBufferCache.forEach((k, v) -> { --- End diff -- Same here. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16523711#comment-16523711 ] ASF GitHub Bot commented on FLINK-9642: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r198136431 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put an event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event +* @return SharedBufferNode +* @throws Exception Thrown if the system cannot access the state. +*/ + private Lockable getEntry(NodeId nodeId) throws Exception { + Lockable entry = entryCache.get(nodeId); + return entry != null ? entry : entries.get(nodeId); + } + + private Lockable getEvent(EventId eventId) throws Exception { + Lockable event = eventsBufferCache.get(eventId); + return event != null ? event : eventsBuffer.get(eventId); + } + + /** +* Flush the event and node in map to state. +* @throws Exception Thrown if the system cannot access the state. +*/ + public void flushCache() throws Exception { + entryCache.forEach((k, v) -> { + try { + entries.put(k, v); + } catch (Exception e) { + throw new RuntimeException(); --- End diff -- I would suggest to not swallow the original Exception here. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521405#comment-16521405 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 The travis error this time seems unrelated. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521386#comment-16521386 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629310 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event --- End diff -- fixed > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521384#comment-16521384 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on the issue: https://github.com/apache/flink/pull/6205 Hi @zhangminglei , thanks for your review. I only check the SharedBufferTest locally before, the error in travis means the num of state access (read and write) is less than before which is the purpose of this pr, and I fix the error. cc @dawidwys > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521383#comment-16521383 ] ASF GitHub Bot commented on FLINK-9642: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629304 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event --- End diff -- Yes. Thanks let me know this. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521380#comment-16521380 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629205 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event --- End diff -- it means `if and only if`. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521381#comment-16521381 ] ASF GitHub Bot commented on FLINK-9642: --- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629218 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -75,6 +76,9 @@ private MapState eventsCount; private MapState> entries; + private HashMap> eventsBufferCache = new HashMap<>(); + private HashMap> entryCache = new HashMap<>(); --- End diff -- agree and fixed > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521365#comment-16521365 ] ASF GitHub Bot commented on FLINK-9642: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6205 Hi, @Aitozi Could you please take a look on the travis error ``` Tests run: 2, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 0.014 sec <<< FAILURE! - in org.apache.flink.cep.nfa.NFAStateAccessTest testIterativeWithABACPattern(org.apache.flink.cep.nfa.NFAStateAccessTest) Time elapsed: 0.009 sec <<< FAILURE! java.lang.AssertionError: expected:<20> but was:<8> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.flink.cep.nfa.NFAStateAccessTest.testIterativeWithABACPattern(NFAStateAccessTest.java:193) testComplexBranchingAfterZeroOrMore(org.apache.flink.cep.nfa.NFAStateAccessTest) Time elapsed: 0.005 sec <<< FAILURE! java.lang.AssertionError: expected:<5> but was:<2> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.flink.cep.nfa.NFAStateAccessTest.testComplexBranchingAfterZeroOrMore(NFAStateAccessTest.java:112) Running org.apache.flink.cep.nfa.compiler.NFACompilerTest Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.019 sec - in org.apache.flink.cep.nfa.compiler.NFACompilerTest Running org.apache.flink.cep.nfa.DeweyNumberTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.01 sec - in org.apache.flink.cep.nfa.DeweyNumberTest Running org.apache.flink.cep.nfa.sharedbuffer.SharedBufferTest Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.004 sec - in org.apache.flink.cep.nfa.sharedbuffer.SharedBufferTest Running org.apache.flink.cep.pattern.PatternTest Tests run: 22, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.033 sec - in org.apache.flink.cep.pattern.PatternTest Results : Failed tests: NFAStateAccessTest.testComplexBranchingAfterZeroOrMore:112 expected:<5> but was:<2> NFAStateAccessTest.testIterativeWithABACPattern:193 expected:<20> but was:<8> Tests run: 80, Failures: 2, Errors: 0, Skipped: 12 13:21:32.852 [INFO] 13:21:32.852 [INFO] Reactor Summary: 13:21:32.852 [INFO] 13:21:32.852 [INFO] flink-queryable-state-client-java .. SUCCESS [ 4.830 s] 13:21:32.852 [INFO] flink-cep .. FAILURE [ 5.861 s] 13:21:32.852 [INFO] flink-table SKIPPED 13:21:32.852 [INFO] flink-queryable-state-runtime .. SKIPPED 13:21:32.852 [INFO] flink-gelly SKIPPED 13:21:32.852 [INFO] flink-gelly-scala .. SKIPPED 13:21:32.852 [INFO] flink-gelly-examples ... SKIPPED 13:21:32.852 [INFO] flink-python ... SKIPPED 13:21:32.852 [INFO] flink-ml ... SKIPPED 13:21:32.852 [INFO] flink-cep-scala SKIPPED ``` > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521362#comment-16521362 ] ASF GitHub Bot commented on FLINK-9642: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197627451 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -75,6 +76,9 @@ private MapState eventsCount; private MapState> entries; + private HashMap> eventsBufferCache = new HashMap<>(); + private HashMap> entryCache = new HashMap<>(); --- End diff -- I would suggest change it to `private Map> eventsBufferCache = new HashMap<>();` > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521364#comment-16521364 ] ASF GitHub Bot commented on FLINK-9642: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197627422 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event +* @param event event body +*/ + private void cacheEvent(EventId eventId, Lockable event) { + this.eventsBufferCache.put(eventId, event); + } + + /** +* Put a ShareBufferNode to cache. +* @param nodeId id of the event +* @param entry SharedBufferNode +*/ + private void cacheEntry(NodeId nodeId, Lockable entry) { + this.entryCache.put(nodeId, entry); + } + + / + // Get + / + + /** +* Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. +* @param nodeId id of the event --- End diff -- iff ? I think it should be if. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521363#comment-16521363 ] ASF GitHub Bot commented on FLINK-9642: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197627410 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable eventWrapper = eventsBuffer.get(eventId); + Lockable eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + / + // Put + / + + /** +* Put a event to cache. +* @param eventId id of the event --- End diff -- Put an event. > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process
[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16521092#comment-16521092 ] ASF GitHub Bot commented on FLINK-9642: --- GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/6205 [FLINK-9642]Reduce the count to deal with state during a CEP process ## What is the purpose of the change With the rework of sharedBuffer Flink-9418, the lock & release operation is deal with state backend which is different from the previous version which will read the state of sharedBuffer all to memory, i think we can add a cache or variable in sharedbuffer to cache the LockAble Object to mark the ref change in once process in NFA, this will reduce the count when the events point to the same Node. And flush the result to MapState at the end of process. ## Brief change log - add the eventsBufferCache and entryCache - flush the cache after one turn process You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink onceQueryCache Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6205.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6205 commit 184ec24474ee5b8a1c9f932286ef4aed4f1dabd6 Author: minwenjun Date: 2018-06-23T12:56:55Z [FLINK-9642]Reduce the count to deal with state during a CEP process > Reduce the count to deal with state during a CEP process > - > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.6.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)