This is an automated email from the ASF dual-hosted git repository.

cammckenzie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new e2d32bd  CURATOR-561 Reset connection after repeat expiry
e2d32bd is described below

commit e2d32bd00ec05a18bf149a47865fd807e00a60e1
Author: Scott Kirkpatrick <scott.kirkpatr...@appian.com>
AuthorDate: Wed Nov 3 12:53:48 2021 -0400

    CURATOR-561 Reset connection after repeat expiry
    
    If there is a problem posting the Expired KeeperState
    during a session expiration, then the ZooKeeper event
    thread will die without ever posting the Expired event.
    This would then cause curator to keep trying to expire
    the connection but it does nothing because the connection
    is dead and no events will ever be posted.
    
    This can be prevented by forcibly resetting the connection
    if it's detected that the previous expiry had no effect
---
 .../framework/state/ConnectionStateManager.java    | 10 ++++-
 .../state/TestConnectionStateManager.java          | 45 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index 9ee09b0..1b5f03b 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -77,6 +77,8 @@ public class ConnectionStateManager implements Closeable
 
     private volatile long startOfSuspendedEpoch = 0;
 
+    private volatile long lastExpiredInstanceIndex = -1;
+
     private enum State
     {
         LATENT,
@@ -318,7 +320,13 @@ public class ConnectionStateManager implements Closeable
                 log.warn(String.format("Session timeout has elapsed while 
SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session 
timeout ms: %d", elapsedMs, useSessionTimeoutMs));
                 try
                 {
-                    
client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
+                    if (lastExpiredInstanceIndex == 
client.getZookeeperClient().getInstanceIndex()) {
+                        // last expiration didn't work for this instance, so 
event thread is dead and a reset is needed. CURATOR-561
+                        client.getZookeeperClient().reset();
+                    } else {
+                        lastExpiredInstanceIndex = 
client.getZookeeperClient().getInstanceIndex();
+                        
client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
+                    }
                 }
                 catch ( Exception e )
                 {
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
index 4bd94e2..67d36cc 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java
@@ -18,8 +18,11 @@
  */
 package org.apache.curator.framework.state;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import com.google.common.collect.Queues;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
@@ -27,9 +30,12 @@ import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.test.compatibility.Timing2;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -83,4 +89,43 @@ public class TestConnectionStateManager extends 
BaseClassForTests {
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testConnectionStateRecoversFromUnexpectedExpiredConnection() 
throws Exception {
+        Timing2 timing = new Timing2();
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .connectionTimeoutMs(1_000)
+                .sessionTimeoutMs(250) // try to aggressively expire the 
connection
+                .retryPolicy(new RetryOneTime(1))
+                .connectionStateErrorPolicy(new 
SessionConnectionStateErrorPolicy())
+                .build();
+        final BlockingQueue<ConnectionState> queue = 
Queues.newLinkedBlockingQueue();
+        ConnectionStateListener listener = (client1, state) -> 
queue.add(state);
+        client.getConnectionStateListenable().addListener(listener);
+        client.start();
+        try {
+            ConnectionState polled = queue.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS);
+            assertEquals(polled, ConnectionState.CONNECTED);
+            
client.getZookeeperClient().getZooKeeper().getTestable().queueEvent(new 
WatchedEvent(
+                    Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Disconnected, null));
+            polled = queue.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS);
+            assertEquals(polled, ConnectionState.SUSPENDED);
+            assertThrows(RuntimeException.class, () -> 
client.getZookeeperClient()
+                    .getZooKeeper().getTestable().queueEvent(new WatchedEvent(
+                    Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Expired, null) {
+                @Override
+                public String getPath() {
+                    // exception will cause ZooKeeper to update current state 
but fail to notify watchers
+                    throw new RuntimeException("Path doesn't exist!");
+                }
+            }));
+            polled = queue.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS);
+            assertEquals(polled, ConnectionState.LOST);
+            polled = queue.poll(timing.forWaiting().seconds(), 
TimeUnit.SECONDS);
+            assertEquals(polled, ConnectionState.RECONNECTED);
+        } finally {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }

Reply via email to