This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f130e4b  [FLINK-12319][Library/CEP]Change the logic of releasing node 
from recursive to non-recursive
f130e4b is described below

commit f130e4bc259746b1542a2f4d8907d3b35195feb8
Author: liyafan82 <42827532+liyafa...@users.noreply.github.com>
AuthorDate: Fri Jul 5 14:44:45 2019 +0800

    [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive 
to non-recursive
---
 .../cep/nfa/sharedbuffer/SharedBufferAccessor.java | 45 +++++++++++-----------
 .../cep/nfa/sharedbuffer/SharedBufferTest.java     | 38 ++++++++++++++++++
 2 files changed, 61 insertions(+), 22 deletions(-)

diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
index 2613f9d..c92df1b 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
@@ -235,31 +235,32 @@ public class SharedBufferAccessor<V> implements 
AutoCloseable {
         * @throws Exception Thrown if the system cannot access the state.
         */
        public void releaseNode(final NodeId node) throws Exception {
-               Lockable<SharedBufferNode> sharedBufferNode = 
sharedBuffer.getEntry(node);
-               if (sharedBufferNode != null) {
-                       if (sharedBufferNode.release()) {
-                               removeNode(node, sharedBufferNode.getElement());
-                       } else {
-                               sharedBuffer.upsertEntry(node, 
sharedBufferNode);
+               // the stack used to detect all nodes that needs to be released.
+               Stack<NodeId> nodesToExamine = new Stack<>();
+               nodesToExamine.push(node);
+
+               while (!nodesToExamine.isEmpty()) {
+                       NodeId curNode = nodesToExamine.pop();
+                       Lockable<SharedBufferNode> curBufferNode = 
sharedBuffer.getEntry(curNode);
+
+                       if (curBufferNode == null) {
+                               break;
                        }
-               }
-       }
 
-       /**
-        * Removes the {@code SharedBufferNode}, when the ref is decreased to 
zero, and also
-        * decrease the ref of the edge on this node.
-        *
-        * @param node id of the entry
-        * @param sharedBufferNode the node body to be removed
-        * @throws Exception Thrown if the system cannot access the state.
-        */
-       private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) 
throws Exception {
-               sharedBuffer.removeEntry(node);
-               EventId eventId = node.getEventId();
-               releaseEvent(eventId);
+                       if (curBufferNode.release()) {
+                               // first release the current node
+                               sharedBuffer.removeEntry(curNode);
+                               releaseEvent(curNode.getEventId());
 
-               for (SharedBufferEdge sharedBufferEdge : 
sharedBufferNode.getEdges()) {
-                       releaseNode(sharedBufferEdge.getTarget());
+                               for (SharedBufferEdge sharedBufferEdge : 
curBufferNode.getElement().getEdges()) {
+                                       NodeId targetId = 
sharedBufferEdge.getTarget();
+                                       if (targetId != null) {
+                                               nodesToExamine.push(targetId);
+                                       }
+                               }
+                       } else {
+                               sharedBuffer.upsertEntry(curNode, 
curBufferNode);
+                       }
                }
        }
 
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
index 0583e8b..abd5b8b 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
@@ -280,4 +280,42 @@ public class SharedBufferTest extends TestLogger {
                assertEquals(4, sharedBuffer.getSharedBufferNodeSize());
        }
 
+       /**
+        * Test releasing a node which has a long path to the terminal node 
(the node without an out-going edge).
+        * @throws Exception if creating the shared buffer accessor fails.
+        */
+       @Test
+       public void testReleaseNodesWithLongPath() throws Exception {
+               SharedBuffer<Event> sharedBuffer = 
TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+
+               final int numberEvents = 100000;
+               Event[] events = new Event[numberEvents];
+               EventId[] eventIds = new EventId[numberEvents];
+               NodeId[] nodeIds = new NodeId[numberEvents];
+
+               final long timestamp = 1L;
+
+               for (int i = 0; i < numberEvents; i++) {
+                       events[i] = new Event(i + 1, "e" + (i + 1), i);
+                       eventIds[i] = sharedBuffer.registerEvent(events[i], 
timestamp);
+               }
+
+               try (SharedBufferAccessor<Event> sharedBufferAccessor = 
sharedBuffer.getAccessor()) {
+
+                       for (int i = 0; i < numberEvents; i++) {
+                               NodeId prevId = i == 0 ? null : nodeIds[i - 1];
+                               nodeIds[i] = sharedBufferAccessor.put("n" + i, 
eventIds[i], prevId, DeweyNumber.fromString("1.0"));
+                       }
+
+                       NodeId lastNode = nodeIds[numberEvents - 1];
+                       sharedBufferAccessor.releaseNode(lastNode);
+
+                       for (int i = 0; i < numberEvents; i++) {
+                               sharedBufferAccessor.releaseEvent(eventIds[i]);
+                       }
+               }
+
+               assertTrue(sharedBuffer.isEmpty());
+       }
+
 }

Reply via email to