Author: mahadev
Date: Thu Aug  5 07:09:02 2010
New Revision: 982485

URL: http://svn.apache.org/viewvc?rev=982485&view=rev
Log:
ZOOKEEPER-795. eventThread isn't shutdown after a connection "session expired" 
event coming (Sergey Doroshenko and Ben via mahadev)

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=982485&r1=982484&r2=982485&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Aug  5 07:09:02 2010
@@ -75,6 +75,9 @@ BUGFIXES: 
   ZOOKEEPER-790.  Last processed zxid set prematurely while establishing 
   leadership (flavio via mahadev)
 
+  ZOOKEEPER-795. eventThread isn't shutdown after a connection 
+  "session expired" event coming (Sergey Doroshenko and Ben via mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=982485&r1=982484&r2=982485&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
Thu Aug  5 07:09:02 2010
@@ -430,6 +430,17 @@ public class ClientCnxn {
         }
     }
 
+    /**
+     * Guard against creating "-EventThread-EventThread-EventThread-..." thread
+     * names when ZooKeeper object is being created from within a watcher.
+     * See ZOOKEEPER-795 for details.
+     */
+    private static String makeThreadName(String suffix) {
+        String name = Thread.currentThread().getName().
+            replaceAll("-EventThread", "");
+        return name + suffix;
+    }
+
     class EventThread extends Thread {
         private final LinkedBlockingQueue<Object> waitingEvents =
             new LinkedBlockingQueue<Object>();
@@ -441,7 +452,7 @@ public class ClientCnxn {
         private volatile KeeperState sessionState = KeeperState.Disconnected;
 
         EventThread() {
-            super(currentThread().getName() + "-EventThread");
+            super(makeThreadName("-EventThread"));
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
             setDaemon(true);
         }
@@ -689,6 +700,7 @@ public class ClientCnxn {
                 eventThread.queueEvent(new WatchedEvent(
                         Watcher.Event.EventType.None,
                         Watcher.Event.KeeperState.Expired, null));
+                eventThread.queueEventOfDeath();
                 throw new SessionExpiredException(
                         "Unable to reconnect to ZooKeeper service, session 0x"
                         + Long.toHexString(sessionId) + " has expired");
@@ -898,7 +910,7 @@ public class ClientCnxn {
         }
 
         SendThread() {
-            super(currentThread().getName() + "-SendThread()");
+            super(makeThreadName("-SendThread()"));
             zooKeeper.state = States.CONNECTING;
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
             setDaemon(true);

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=982485&r1=982484&r2=982485&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java 
Thu Aug  5 07:09:02 2010
@@ -23,7 +23,9 @@ import static org.apache.zookeeper.test.
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -194,18 +196,55 @@ public class SessionTest extends ZKTestC
         zk.close();
     }
 
+    private List<Thread> findThreads(String name) {
+        int threadCount = Thread.activeCount();
+        Thread threads[] = new Thread[threadCount*2];
+        threadCount = Thread.enumerate(threads);
+        ArrayList<Thread> list = new ArrayList<Thread>();
+        for(int i = 0; i < threadCount; i++) {
+            if (threads[i].getName().indexOf(name) != -1) {
+                list.add(threads[i]);
+            }
+        }
+        return list;
+    }
+
     /**
      * Make sure ephemerals get cleaned up when a session times out.
      */
     @Test
     public void testSessionTimeout() throws Exception {
         final int TIMEOUT = 5000;
+        List<Thread> etBefore = findThreads("EventThread");
+        List<Thread> stBefore = findThreads("SendThread");
         DisconnectableZooKeeper zk = createClient(TIMEOUT);
         zk.create("/stest", new byte[0], Ids.OPEN_ACL_UNSAFE,
                 CreateMode.EPHEMERAL);
-        zk.disconnect();
+
+        // Find the new event and send threads
+        List<Thread> etAfter = findThreads("EventThread");
+        List<Thread> stAfter = findThreads("SendThread");
+        Thread eventThread = null;
+        Thread sendThread = null;
+        for(Thread t: etAfter) {
+            if (!etBefore.contains(t)) {
+                eventThread = t;
+                break;
+            }
+        }
+        for(Thread t: stAfter) {
+            if (!stBefore.contains(t)) {
+                sendThread = t;
+                break;
+            }
+        }
+        sendThread.suspend();
+        //zk.disconnect();
 
         Thread.sleep(TIMEOUT*2);
+        sendThread.resume();
+        eventThread.join(TIMEOUT);
+        Assert.assertFalse("EventThread is still running", 
eventThread.isAlive());
 
         zk = createClient(TIMEOUT);
         zk.create("/stest", new byte[0], Ids.OPEN_ACL_UNSAFE,


Reply via email to