This is an automated email from the ASF dual-hosted git repository. cammckenzie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/curator.git
The following commit(s) were added to refs/heads/master by this push: new e2d32bd CURATOR-561 Reset connection after repeat expiry e2d32bd is described below commit e2d32bd00ec05a18bf149a47865fd807e00a60e1 Author: Scott Kirkpatrick <scott.kirkpatr...@appian.com> AuthorDate: Wed Nov 3 12:53:48 2021 -0400 CURATOR-561 Reset connection after repeat expiry If there is a problem posting the Expired KeeperState during a session expiration, then the ZooKeeper event thread will die without ever posting the Expired event. This would then cause curator to keep trying to expire the connection but it does nothing because the connection is dead and no events will ever be posted. This can be prevented by forcibly resetting the connection if it's detected that the previous expiry had no effect --- .../framework/state/ConnectionStateManager.java | 10 ++++- .../state/TestConnectionStateManager.java | 45 ++++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java index 9ee09b0..1b5f03b 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java @@ -77,6 +77,8 @@ public class ConnectionStateManager implements Closeable private volatile long startOfSuspendedEpoch = 0; + private volatile long lastExpiredInstanceIndex = -1; + private enum State { LATENT, @@ -318,7 +320,13 @@ public class ConnectionStateManager implements Closeable log.warn(String.format("Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: %d. Adjusted session timeout ms: %d", elapsedMs, useSessionTimeoutMs)); try { - client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration(); + if (lastExpiredInstanceIndex == client.getZookeeperClient().getInstanceIndex()) { + // last expiration didn't work for this instance, so event thread is dead and a reset is needed. CURATOR-561 + client.getZookeeperClient().reset(); + } else { + lastExpiredInstanceIndex = client.getZookeeperClient().getInstanceIndex(); + client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration(); + } } catch ( Exception e ) { diff --git a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java index 4bd94e2..67d36cc 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/state/TestConnectionStateManager.java @@ -18,8 +18,11 @@ */ package org.apache.curator.framework.state; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.google.common.collect.Queues; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; @@ -27,9 +30,12 @@ import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.compatibility.CuratorTestBase; import org.apache.curator.test.compatibility.Timing2; import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -83,4 +89,43 @@ public class TestConnectionStateManager extends BaseClassForTests { CloseableUtils.closeQuietly(client); } } + + @Test + public void testConnectionStateRecoversFromUnexpectedExpiredConnection() throws Exception { + Timing2 timing = new Timing2(); + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .connectionTimeoutMs(1_000) + .sessionTimeoutMs(250) // try to aggressively expire the connection + .retryPolicy(new RetryOneTime(1)) + .connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy()) + .build(); + final BlockingQueue<ConnectionState> queue = Queues.newLinkedBlockingQueue(); + ConnectionStateListener listener = (client1, state) -> queue.add(state); + client.getConnectionStateListenable().addListener(listener); + client.start(); + try { + ConnectionState polled = queue.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS); + assertEquals(polled, ConnectionState.CONNECTED); + client.getZookeeperClient().getZooKeeper().getTestable().queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null)); + polled = queue.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS); + assertEquals(polled, ConnectionState.SUSPENDED); + assertThrows(RuntimeException.class, () -> client.getZookeeperClient() + .getZooKeeper().getTestable().queueEvent(new WatchedEvent( + Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null) { + @Override + public String getPath() { + // exception will cause ZooKeeper to update current state but fail to notify watchers + throw new RuntimeException("Path doesn't exist!"); + } + })); + polled = queue.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS); + assertEquals(polled, ConnectionState.LOST); + polled = queue.poll(timing.forWaiting().seconds(), TimeUnit.SECONDS); + assertEquals(polled, ConnectionState.RECONNECTED); + } finally { + CloseableUtils.closeQuietly(client); + } + } }