Author: breed Date: Tue Aug 17 20:03:54 2010 New Revision: 986470 URL: http://svn.apache.org/viewvc?rev=986470&view=rev Log: ZOOKEEPER-795. eventThread isn't shutdown after a connection "session expired" event coming
Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=986470&r1=986469&r2=986470&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Tue Aug 17 20:03:54 2010 @@ -12,6 +12,8 @@ BUGFIXES: ZOOKEEPER-772. zkpython segfaults when watcher from async get children is invoked. (henry robinson via mahadev) + ZOOKEEPER-795. eventThread isn't shutdown after a connection "session expired" event coming (sergey doroshenko via breed) + IMPROVEMENTS: ZOOKEEPER-789. Improve FLE log messages (flavio via phunt) Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=986470&r1=986469&r2=986470&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java Tue Aug 17 20:03:54 2010 @@ -430,6 +430,17 @@ public class ClientCnxn { } } + /** + * Guard against creating "-EventThread-EventThread-EventThread-..." thread + * names when ZooKeeper object is being created from within a watcher. + * See ZOOKEEPER-795 for details. + */ + private static String makeThreadName(String suffix) { + String name = Thread.currentThread().getName(). + replaceAll("-EventThread", ""); + return name + suffix; + } + class EventThread extends Thread { private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>(); @@ -441,7 +452,7 @@ public class ClientCnxn { private volatile KeeperState sessionState = KeeperState.Disconnected; EventThread() { - super(currentThread().getName() + "-EventThread"); + super(makeThreadName("-EventThread")); setUncaughtExceptionHandler(uncaughtExceptionHandler); setDaemon(true); } @@ -689,6 +700,7 @@ public class ClientCnxn { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); + eventThread.queueEventOfDeath(); throw new SessionExpiredException( "Unable to reconnect to ZooKeeper service, session 0x" + Long.toHexString(sessionId) + " has expired"); @@ -898,7 +910,7 @@ public class ClientCnxn { } SendThread() { - super(currentThread().getName() + "-SendThread()"); + super(makeThreadName("-SendThread()")); zooKeeper.state = States.CONNECTING; setUncaughtExceptionHandler(uncaughtExceptionHandler); setDaemon(true); Modified: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=986470&r1=986469&r2=986470&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java Tue Aug 17 20:03:54 2010 @@ -23,7 +23,9 @@ import static org.apache.zookeeper.test. import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -155,12 +157,12 @@ public class SessionTest extends TestCas // zk.close(); // } - @Test /** * This test verifies that when the session id is reused, and the original * client is disconnected, but not session closed, that the server * will remove ephemeral nodes created by the original session. */ + @Test public void testSession() throws IOException, InterruptedException, KeeperException { @@ -195,18 +197,55 @@ public class SessionTest extends TestCas zk.close(); } - @Test + private List<Thread> findThreads(String name) { + int threadCount = Thread.activeCount(); + Thread threads[] = new Thread[threadCount*2]; + threadCount = Thread.enumerate(threads); + ArrayList<Thread> list = new ArrayList<Thread>(); + for(int i = 0; i < threadCount; i++) { + if (threads[i].getName().indexOf(name) != -1) { + list.add(threads[i]); + } + } + return list; + } + /** * Make sure ephemerals get cleaned up when a session times out. */ + @Test public void testSessionTimeout() throws Exception { final int TIMEOUT = 5000; + List<Thread> etBefore = findThreads("EventThread"); + List<Thread> stBefore = findThreads("SendThread"); DisconnectableZooKeeper zk = createClient(TIMEOUT); zk.create("/stest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - zk.disconnect(); + + // Find the new event and send threads + List<Thread> etAfter = findThreads("EventThread"); + List<Thread> stAfter = findThreads("SendThread"); + Thread eventThread = null; + Thread sendThread = null; + for(Thread t: etAfter) { + if (!etBefore.contains(t)) { + eventThread = t; + break; + } + } + for(Thread t: stAfter) { + if (!stBefore.contains(t)) { + sendThread = t; + break; + } + } + sendThread.suspend(); + //zk.disconnect(); Thread.sleep(TIMEOUT*2); + sendThread.resume(); + eventThread.join(TIMEOUT); + assertFalse("EventThread is still running", eventThread.isAlive()); zk = createClient(TIMEOUT); zk.create("/stest", new byte[0], Ids.OPEN_ACL_UNSAFE,