Author: breed
Date: Wed Aug 27 16:36:40 2008
New Revision: 689668

URL: http://svn.apache.org/viewvc?rev=689668&view=rev
Log:
 ZOOKEEPER-63. Race condition in client close() operation. (phunt via breed)


Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Aug 27 16:36:40 2008
@@ -47,3 +47,5 @@
 
  ZOOKEEPER-125. Remove unwanted class declaration in FastLeaderElection. 
  (Flavio Paiva Junqueira via mahadev)
+
+ ZOOKEEPER-63. Race condition in client close() operation. (phunt via breed)

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
Wed Aug 27 16:36:40 2008
@@ -34,11 +34,10 @@
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.log4j.Logger;
-
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
+import org.apache.log4j.Logger;
 import org.apache.zookeeper.AsyncCallback.ACLCallback;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
@@ -63,7 +62,6 @@
 import org.apache.zookeeper.proto.SetDataResponse;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.ByteBufferInputStream;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
 
 /**
@@ -75,7 +73,8 @@
 class ClientCnxn {
     private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
 
-    private ArrayList<InetSocketAddress> serverAddrs = new 
ArrayList<InetSocketAddress>();
+    private ArrayList<InetSocketAddress> serverAddrs =
+        new ArrayList<InetSocketAddress>();
 
     static class AuthData {
         AuthData(String scheme, byte data[]) {
@@ -122,6 +121,12 @@
     final EventThread eventThread;
 
     final Selector selector = Selector.open();
+    
+    /** Set to true when close is called. Latches the connection such that
+     * we don't attempt to re-connect to the server if in the middle of
+     * closing the connection (client sends session disconnect to server
+     * as part of close operation) */
+    volatile boolean closing = false;
 
     public long getSessionId() {
         return sessionId;
@@ -253,7 +258,7 @@
 
     class EventThread extends Thread {
         EventThread() {
-            super("EventThread");
+            super(currentThread().getName() + "-EventThread");
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
             setDaemon(true);
         }
@@ -341,7 +346,10 @@
                     }
                 }
             } catch (InterruptedException e) {
+                LOG.warn("Event thread exiting due to interruption", e);
             }
+            
+            LOG.info("EventThread shut down");
         }
     }
 
@@ -566,7 +574,7 @@
         }
 
         SendThread() {
-            super("SendThread");
+            super(currentThread().getName() + "-SendThread");
             zooKeeper.state = States.CONNECTING;
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
             setDaemon(true);
@@ -666,6 +674,10 @@
             while (zooKeeper.state.isAlive()) {
                 try {
                     if (sockKey == null) {
+                        // don't re-establish connection if we are closing
+                        if (closing) {
+                            break;
+                        }
                         startConnect();
                         lastSend = now;
                         lastHeard = now;
@@ -730,21 +742,34 @@
                     }
                     selected.clear();
                 } catch (Exception e) {
-                    LOG.warn("Closing session 0x" 
-                            + Long.toHexString(getSessionId()),
-                            e);
-                    cleanup();
-                    if (zooKeeper.state.isAlive()) {
-                        waitingEvents.add(new WatcherEvent(Event.EventNone,
-                                Event.KeeperStateDisconnected, null));
+                    if (closing) {
+                        // closing so this is expected
+                        LOG.info("Exception while closing send thread for 
session 0x" 
+                                + Long.toHexString(getSessionId())
+                                + " : " + e.getMessage());
+                        break;
+                    } else {
+                        LOG.warn("Exception closing session 0x" 
+                                + Long.toHexString(getSessionId()),
+                                e);
+                        cleanup();
+                        if (zooKeeper.state.isAlive()) {
+                            waitingEvents.add(new WatcherEvent(Event.EventNone,
+                                    Event.KeeperStateDisconnected, null));
+                        }
+    
+                        now = System.currentTimeMillis();
+                        lastHeard = now;
+                        lastSend = now;
                     }
-
-                    now = System.currentTimeMillis();
-                    lastHeard = now;
-                    lastSend = now;
                 }
             }
             cleanup();
+            try {
+                selector.close();
+            } catch (IOException e) {
+                LOG.warn("Ignoring exception during selector close", e);
+            }
             ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                                      "SendThread exitedloop.");
         }
@@ -755,25 +780,29 @@
                 sockKey.cancel();
                 try {
                     sock.socket().shutdownInput();
-                } catch (IOException e2) {
+                } catch (IOException e) {
+                    LOG.warn("Ignoring exception during shutdown input", e);
                 }
                 try {
                     sock.socket().shutdownOutput();
-                } catch (IOException e2) {
+                } catch (IOException e) {
+                    LOG.warn("Ignoring exception during shutdown output", e);
                 }
                 try {
                     sock.socket().close();
-                } catch (IOException e1) {
+                } catch (IOException e) {
+                    LOG.warn("Ignoring exception during socket close", e);
                 }
                 try {
                     sock.close();
-                } catch (IOException e1) {
+                } catch (IOException e) {
+                    LOG.warn("Ignoring exception during channel close", e);
                 }
             }
             try {
                 Thread.sleep(100);
-            } catch (InterruptedException e1) {
-                e1.printStackTrace();
+            } catch (InterruptedException e) {
+                LOG.warn("SendThread interrupted during sleep, ignoring");
             }
             sockKey = null;
             synchronized (pendingQueue) {
@@ -798,15 +827,44 @@
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public void close() throws IOException {
-        LOG.info("Closing ClientCnxn for session: 0x" 
+    /**
+     * Shutdown the send/event threads. This method should not be called
+     * directly - rather it should be called as part of close operation. This
+     * method is primarily here to allow the tests to verify disconnection
+     * behavior.
+     */
+    public void disconnect() {
+        LOG.info("Disconnecting ClientCnxn for session: 0x" 
                 + Long.toHexString(getSessionId()));
 
         sendThread.close();
         waitingEvents.add(eventOfDeath);
     }
 
+    /**
+     * Close the connection, which includes; send session disconnect to
+     * the server, shutdown the send/event threads.
+     * 
+     * @throws IOException
+     */
+    public void close() throws IOException {
+        LOG.info("Closing ClientCnxn for session: 0x" 
+                + Long.toHexString(getSessionId()));
+
+        closing = true;
+        
+        try {
+            RequestHeader h = new RequestHeader();
+            h.setType(ZooDefs.OpCode.closeSession);
+            
+            submitRequest(h, null, null, null);
+        } catch (InterruptedException e) {
+            // ignore, close the send/event threads
+        } finally {
+            disconnect();
+        }
+    }
+
     private int xid = 1;
 
     synchronized private int getXid() {

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java 
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java 
Wed Aug 27 16:36:40 2008
@@ -312,9 +312,6 @@
     public synchronized void close() throws InterruptedException {
         LOG.info("Closing session: 0x" + Long.toHexString(getSessionId()));
 
-        RequestHeader h = new RequestHeader();
-        h.setType(ZooDefs.OpCode.closeSession);
-        cnxn.submitRequest(h, null, null, null);
         try {
             cnxn.close();
         } catch (IOException e) {
@@ -940,7 +937,10 @@
 
     // Everything below this line is for testing!
 
+    /** Testing only!!! Really this needs to be moved into a stub in the
+     * tests - pending JIRA for that.
+     */
     public void disconnect() throws IOException {
-        cnxn.close();
+        cnxn.disconnect();
     }
 }

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java 
Wed Aug 27 16:36:40 2008
@@ -45,12 +45,11 @@
 import org.junit.Test;
 
 public class AsyncTest extends TestCase
-    implements Watcher, StringCallback, VoidCallback, DataCallback
+    implements StringCallback, VoidCallback, DataCallback
 {
     private static final Logger LOG = Logger.getLogger(AsyncTest.class);
 
     private QuorumTest quorumTest = new QuorumTest();
-    private CountDownLatch clientConnected;
 
     private volatile boolean bang;
 
@@ -72,11 +71,20 @@
     @Override
     protected void tearDown() throws Exception {
         LOG.info("Test clients shutting down");
-        clientConnected = null;
         quorumTest.tearDown();
         LOG.info("FINISHED " + getName());
     }
 
+    private static class CountdownWatcher implements Watcher {
+        volatile CountDownLatch clientConnected = new CountDownLatch(1);
+
+        public void process(WatcherEvent event) {
+            if (event.getState() == Event.KeeperStateSyncConnected) {
+                clientConnected.countDown();
+            }
+        }
+    }
+    
     private ZooKeeper createClient() throws IOException,InterruptedException {
         return createClient(quorumTest.hostPort);
     }
@@ -84,9 +92,11 @@
     private ZooKeeper createClient(String hp)
         throws IOException, InterruptedException
     {
-        clientConnected = new CountDownLatch(1);
-        ZooKeeper zk = new ZooKeeper(hp, 30000, this);
-        if(!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(hp, 30000, watcher);
+        if(!watcher.clientConnected.await(CONNECTION_TIMEOUT,
+                TimeUnit.MILLISECONDS))
+        {
             fail("Unable to connect to server");
         }
         return zk;
@@ -239,12 +249,6 @@
         }
     }
 
-    public void process(WatcherEvent event) {
-        if(event.getState()==Event.KeeperStateSyncConnected){
-            clientConnected.countDown();
-        }
-    }
-
     @SuppressWarnings("unchecked")
     public void processResult(int rc, String path, Object ctx, String name) {
         synchronized(ctx) {

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java 
Wed Aug 27 16:36:40 2008
@@ -25,10 +25,16 @@
 import java.io.OutputStream;
 import java.net.Socket;
 import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ServerStats;
 import org.apache.zookeeper.server.SyncRequestProcessor;
@@ -53,6 +59,51 @@
         super(name);
     }
 
+    /**
+     * In general don't use this. Only use in the special case that you
+     * want to ignore results (for whatever reason) in your test. Don't
+     * use empty watchers in real code!
+     *
+     */
+    protected class NullWatcher implements Watcher {
+        public void process(WatcherEvent event) { /* nada */ }
+    }
+
+    protected static class CountdownWatcher implements Watcher {
+        volatile CountDownLatch clientConnected = new CountDownLatch(1);
+
+        public void process(WatcherEvent event) {
+            if (event.getState() == Event.KeeperStateSyncConnected) {
+                clientConnected.countDown();
+            }
+        }
+    }
+    
+    protected ZooKeeper createClient()
+        throws IOException, InterruptedException
+    {
+        return createClient(hostPort);
+    }
+
+    protected ZooKeeper createClient(String hp)
+        throws IOException, InterruptedException
+    {
+        CountdownWatcher watcher = new CountdownWatcher();
+        return createClient(watcher, hp);
+    }
+
+    protected ZooKeeper createClient(CountdownWatcher watcher, String hp)
+        throws IOException, InterruptedException
+    {
+        ZooKeeper zk = new ZooKeeper(hp, 20000, watcher);
+        if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
+                TimeUnit.MILLISECONDS))
+        {
+            fail("Unable to connect to server");
+        }
+        return zk;
+    }
+
     public static boolean waitForServerUp(String hp, long timeout) {
         long start = System.currentTimeMillis();
         String split[] = hp.split(":");
@@ -242,4 +293,44 @@
         return d.delete();
     }
 
+    /*
+     * Verify that all of the servers see the same number of nodes
+     * at the root
+     */
+    void verifyRootOfAllServersMatch(String hostPort)
+        throws InterruptedException, KeeperException, IOException
+    {
+        String parts[] = hostPort.split(",");
+
+        // run through till the counts no longer change on each server
+        // max 15 tries, with 2 second sleeps, so approx 30 seconds
+        int[] counts = new int[parts.length];
+        for (int j = 0; j < 100; j++) {
+            int newcounts[] = new int[parts.length];
+            int i = 0;
+            for (String hp : parts) {
+                ZooKeeper zk = createClient(hp);
+                try {
+                    newcounts[i++] = zk.getChildren("/", false).size();
+                } finally {
+                    zk.close();
+                }
+            }
+
+            if (Arrays.equals(newcounts, counts)) {
+                LOG.info("Found match with array:"
+                        + Arrays.toString(newcounts));
+                counts = newcounts;
+                break;
+            } else {
+                counts = newcounts;
+                Thread.sleep(10000);
+            }
+        }
+
+        // verify all the servers reporting same number of nodes
+        for (int i = 1; i < parts.length; i++) {
+            assertEquals("node count not consistent", counts[i-1], counts[i]);
+        }
+    }
 }
\ No newline at end of file

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java 
Wed Aug 27 16:36:40 2008
@@ -20,20 +20,18 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.InvalidACLException;
+import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.ZooDefs.CreateFlags;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Perms;
@@ -43,33 +41,14 @@
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.junit.Test;
 
-public class ClientTest extends ClientBase implements Watcher {
+public class ClientTest extends ClientBase {
     protected static final Logger LOG = Logger.getLogger(ClientTest.class);
 
     LinkedBlockingQueue<WatcherEvent> events =
         new LinkedBlockingQueue<WatcherEvent>();
-    protected volatile CountDownLatch clientConnected;
-
-    protected ZooKeeper createClient(Watcher watcher)
-        throws IOException, InterruptedException
-    {
-        return createClient(watcher, hostPort);
-    }
-
-    protected ZooKeeper createClient(Watcher watcher, String hp)
-        throws IOException, InterruptedException
-    {
-        clientConnected = new CountDownLatch(1);
-        ZooKeeper zk = new ZooKeeper(hp, 20000, watcher);
-        if (!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) 
{
-            fail("Unable to connect to server");
-        }
-        return zk;
-    }
 
     @Override
     protected void tearDown() throws Exception {
-        clientConnected = null;
         super.tearDown();
         LOG.info("FINISHED " + getName());
     }
@@ -79,8 +58,8 @@
         ZooKeeper zkIdle = null;
         ZooKeeper zkWatchCreator = null;
         try {
-            zkIdle = createClient(this);
-            zkWatchCreator = createClient(this);
+            zkIdle = createClient();
+            zkWatchCreator = createClient();
             for (int i = 0; i < 30; i++) {
                 zkWatchCreator.create("/" + i, new byte[0], 
Ids.OPEN_ACL_UNSAFE, 0);
             }
@@ -121,12 +100,13 @@
     public void testACLs() throws Exception {
         ZooKeeper zk = null;
         try {
-            zk = createClient(this);
+            zk = createClient();
             try {
                 zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
                 fail("Should have received an invalid acl error");
             } catch(InvalidACLException e) {
-                LOG.error("Invalid acl", e);
+                LOG.info("Test successful, invalid acl received : "
+                        + e.getMessage());
             }
             try {
                 ArrayList<ACL> testACL = new ArrayList<ACL>();
@@ -135,12 +115,13 @@
                 zk.create("/acltest", new byte[0], testACL, 0);
                 fail("Should have received an invalid acl error");
             } catch(InvalidACLException e) {
-                LOG.error("Invalid acl", e);
+                LOG.info("Test successful, invalid acl received : "
+                        + e.getMessage());
             }
             zk.addAuthInfo("digest", "ben:passwd".getBytes());
             zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
             zk.close();
-            zk = createClient(this);
+            zk = createClient();
             zk.addAuthInfo("digest", "ben:passwd2".getBytes());
             try {
                 zk.getData("/acltest", false, new Stat());
@@ -152,7 +133,7 @@
             zk.getData("/acltest", false, new Stat());
             zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1);
             zk.close();
-            zk = createClient(this);
+            zk = createClient();
             zk.getData("/acltest", false, new Stat());
             List<ACL> acls = zk.getACL("/acltest", new Stat());
             assertEquals(1, acls.size());
@@ -165,11 +146,25 @@
         }
     }
 
-    private void performClientTest(boolean withWatcherObj) throws IOException,
-            InterruptedException, KeeperException {
+    protected class MyWatcher extends CountdownWatcher {
+        public void process(WatcherEvent event) {
+            super.process(event);
+            if (event.getType() != Event.EventNone) {
+                try {
+                    events.put(event);
+                } catch (InterruptedException e) {
+                    LOG.warn("ignoring interrupt during event.put");
+                }
+            }
+        }
+    }
+
+    private void performClientTest(boolean withWatcherObj)
+        throws IOException, InterruptedException, KeeperException
+    {
         ZooKeeper zk = null;
         try {
-            zk =createClient(this);
+            zk = createClient(new MyWatcher(), hostPort);
             //LOG.info("Created client: " + zk.describeCNXN());
             LOG.info("Before create /benwashere");
             zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
@@ -188,7 +183,7 @@
             zk.close();
             //LOG.info("Closed client: " + zk.describeCNXN());
             Thread.sleep(2000);
-            zk = createClient(this);
+            zk = createClient(new MyWatcher(), hostPort);
             //LOG.info("Created a new client: " + zk.describeCNXN());
             LOG.info("Before delete /");
 
@@ -213,7 +208,7 @@
                 if (withWatcherObj) {
                     assertEquals(null, zk.exists("/frog", new MyWatcher()));
                 } else {
-                assertEquals(null, zk.exists("/frog", true));
+                    assertEquals(null, zk.exists("/frog", true));
                 }
                 LOG.info("Comment: asseting passed for frog setting /");
             } catch (KeeperException.NoNodeException e) {
@@ -292,14 +287,17 @@
 
     // Test that sequential filenames are being created correctly,
     // with 0-padding in the filename
-    public void testSequentialNodeNames() throws IOException, 
InterruptedException, KeeperException {
+    @Test
+    public void testSequentialNodeNames()
+        throws IOException, InterruptedException, KeeperException
+    {
         String path = "/SEQUENCE";
-    String file = "TEST";
-    String filepath = path + "/" + file;
+        String file = "TEST";
+        String filepath = path + "/" + file;
 
         ZooKeeper zk = null;
         try {
-            zk =createClient(this);
+            zk = createClient();
             zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
             zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateFlags.SEQUENCE);
             List<String> children = zk.getChildren(path, false);
@@ -351,7 +349,7 @@
 
     @Test
     public void testDeleteWithChildren() throws Exception {
-        ZooKeeper zk = createClient(this);
+        ZooKeeper zk = createClient();
         zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
         zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
         try {
@@ -364,20 +362,27 @@
         zk.delete("/parent", -1);
         zk.close();
     }
-
-    private static class HammerThread extends Thread {
-        private static final long LATENCY = 5;
+    
+    private static final long HAMMERTHREAD_LATENCY = 5;
+    
+    private static abstract class HammerThread extends Thread {
+        protected final int count;
+        protected volatile int current = 0;
         
+        HammerThread(String name, int count) {
+            super(name);
+            this.count = count;
+        }
+    }
+
+    private static class BasicHammerThread extends HammerThread {
         private final ZooKeeper zk;
         private final String prefix;
-        private final int count;
-        private volatile int current = 0;
 
-        HammerThread(String name, ZooKeeper zk, String prefix, int count) {
-            super(name);
+        BasicHammerThread(String name, ZooKeeper zk, String prefix, int count) 
{
+            super(name, count);
             this.zk = zk;
             this.prefix = prefix;
-            this.count = count;
         }
 
         public void run() {
@@ -385,87 +390,129 @@
             try {
                 for (; current < count; current++) {
                     // Simulate a bit of network latency...
-                    Thread.sleep(LATENCY);
+                    Thread.sleep(HAMMERTHREAD_LATENCY);
                     zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0);
                 }
-            } catch (Exception e) {
-                LOG.error("Client create operation failed", e);
+            } catch (Throwable t) {
+                LOG.error("Client create operation failed", t);
             } finally {
-                if (zk != null) {
-                    try {
-                        zk.close();
-                    } catch (InterruptedException e) {
-                        LOG.warn("Unexpected", e);
-                    }
+                try {
+                    zk.close();
+                } catch (InterruptedException e) {
+                    LOG.warn("Unexpected", e);
                 }
             }
         }
     }
 
-    /*
-     * Verify that all of the servers see the same number of nodes
-     * at the root
-     */
-    void verifyRootOfAllServersMatch(String hostPort)
-        throws InterruptedException, KeeperException, IOException
-    {
-        String parts[] = hostPort.split(",");
+    private static class SuperHammerThread extends HammerThread {
+        private final ClientTest parent;
+        private final String prefix;
 
-        // run through till the counts no longer change on each server
-        // max 15 tries, with 2 second sleeps, so approx 30 seconds
-        int[] counts = new int[parts.length];
-        for (int j = 0; j < 100; j++) {
-            int newcounts[] = new int[parts.length];
-            int i = 0;
-            for (String hp : parts) {
-                ZooKeeper zk = createClient(this, hp);
-                try {
-                    newcounts[i++] = zk.getChildren("/", false).size();
-                } finally {
-                    zk.close();
-                }
-            }
+        SuperHammerThread(String name, ClientTest parent, String prefix,
+                int count)
+        {
+            super(name, count);
+            this.parent = parent;
+            this.prefix = prefix;
+        }
 
-            if (Arrays.equals(newcounts, counts)) {
-                LOG.info("Found match with array:"
-                        + Arrays.toString(newcounts));
-                counts = newcounts;
-                break;
-            } else {
-                counts = newcounts;
-                Thread.sleep(10000);
+        public void run() {
+            byte b[] = new byte[256];
+            try {
+                for (; current < count; current++) {
+                    ZooKeeper zk = parent.createClient();
+                    try {
+                        zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0);
+                    } finally {
+                        try {
+                            zk.close();
+                        } catch (InterruptedException e) {
+                            LOG.warn("Unexpected", e);
+                        }
+                    }
+                }
+            } catch (Throwable t) {
+                LOG.error("Client create operation failed", t);
             }
         }
+    }
 
-        // verify all the servers reporting same number of nodes
-        for (int i = 1; i < parts.length; i++) {
-            assertEquals("node count not consistent", counts[i-1], counts[i]);
+    /**
+     * Separate threads each creating a number of nodes. Each thread
+     * is using a non-shared (owned by thread) client for all node creations.
+     * @throws Throwable
+     */
+    @Test
+    public void testHammerBasic() throws Throwable {
+        try {
+            final int threadCount = 10;
+            final int childCount = 1000;
+            
+            HammerThread[] threads = new HammerThread[threadCount];
+            long start = System.currentTimeMillis();
+            for (int i = 0; i < threads.length; i++) {
+                ZooKeeper zk = createClient();
+                String prefix = "/test-" + i;
+                zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+                prefix += "/";
+                HammerThread thread = 
+                    new BasicHammerThread("BasicHammerThread-" + i, zk, prefix,
+                            childCount);
+                thread.start();
+                
+                threads[i] = thread;
+            }
+            
+            verifyHammer(start, threads, childCount);
+        } catch (Throwable t) {
+            LOG.error("test failed", t);
+            throw t;
         }
     }
-
-
+    
+    /**
+     * Separate threads each creating a number of nodes. Each thread
+     * is creating a new client for each node creation.
+     * @throws Throwable
+     */
     @Test
-    public void testHammer() 
-        throws IOException, InterruptedException, KeeperException 
-    {
-        final int threadCount = 10;
-        final int childCount = 1000;
-        
-        HammerThread[] threads = new HammerThread[threadCount];
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < threads.length; i++) {
-            Thread.sleep(10);
-            ZooKeeper zk = createClient(this);
-            String prefix = "/test-" + i;
-            zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
-            prefix += "/";
-            HammerThread thread = 
-                new HammerThread("HammerThread-" + i, zk, prefix, childCount);
-            thread.start();
+    public void testHammerSuper() throws Throwable {
+        try {
+            final int threadCount = 5;
+            final int childCount = 10;
             
-            threads[i] = thread;
+            HammerThread[] threads = new HammerThread[threadCount];
+            long start = System.currentTimeMillis();
+            for (int i = 0; i < threads.length; i++) {
+                String prefix = "/test-" + i;
+                {
+                    ZooKeeper zk = createClient();
+                    try {
+                        zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+                    } finally {
+                        zk.close();
+                    }
+                }
+                prefix += "/";
+                HammerThread thread = 
+                    new SuperHammerThread("SuperHammerThread-" + i, this,
+                            prefix, childCount);
+                thread.start();
+                
+                threads[i] = thread;
+            }
+            
+            verifyHammer(start, threads, childCount);
+        } catch (Throwable t) {
+            LOG.error("test failed", t);
+            throw t;
         }
-        
+    }
+    
+    public void verifyHammer(long start, HammerThread[] threads, int 
childCount) 
+        throws IOException, InterruptedException, KeeperException 
+    {
         // look for the clients to finish their create operations
         LOG.info("Starting check for completed hammers");
         int workingCount = threads.length;
@@ -493,44 +540,75 @@
         for (HammerThread h : threads) {
             final int safetyFactor = 3;
             verifyThreadTerminated(h,
-                    threadCount * childCount 
-                    * HammerThread.LATENCY * safetyFactor);
+                    threads.length * childCount 
+                    * HAMMERTHREAD_LATENCY * safetyFactor);
         }
         LOG.info(new Date() + " Total time "
                 + (System.currentTimeMillis() - start));
         
-        ZooKeeper zk = createClient(this);
-        
-        LOG.info("******************* Connected to ZooKeeper" + new Date());
-        for (int i = 0; i < threadCount; i++) {
-            LOG.info("Doing thread: " + i + " " + new Date());
-            List<String> children =
-                zk.getChildren("/test-" + i, false);
-            assertEquals(childCount, children.size());
-        }
-        for (int i = 0; i < threadCount; i++) {
-            List<String> children =
-                zk.getChildren("/test-" + i, false);
-            assertEquals(childCount, children.size());
+        ZooKeeper zk = createClient();
+        try {
+            
+            LOG.info("******************* Connected to ZooKeeper" + new 
Date());
+            for (int i = 0; i < threads.length; i++) {
+                LOG.info("Doing thread: " + i + " " + new Date());
+                List<String> children =
+                    zk.getChildren("/test-" + i, false);
+                assertEquals(childCount, children.size());
+            }
+            for (int i = 0; i < threads.length; i++) {
+                List<String> children =
+                    zk.getChildren("/test-" + i, false);
+                assertEquals(childCount, children.size());
+            }
+        } finally {
+            zk.close();
         }
     }
-
-    public class MyWatcher implements Watcher {
-        public void process(WatcherEvent event) {
-            ClientTest.this.process(event);
+    
+    private class VerifyClientCleanup extends Thread {
+        int count;
+        int current = 0;
+        
+        VerifyClientCleanup(String name, int count) {
+            super(name);
+            this.count = count;
+        }
+        
+        public void run() {
+            try {
+                for (; current < count; current++) {
+                    ZooKeeper zk = createClient();
+                    zk.close();
+                }
+            } catch (Throwable t) {
+                LOG.error("test failed", t);
+            }
         }
     }
 
-    public void process(WatcherEvent event) {
-        if (event.getState() == Event.KeeperStateSyncConnected) {
-            clientConnected.countDown();
+    /**
+     * Verify that the client is cleaning up properly. Open/close a large
+     * number of sessions. Essentially looking to see if sockets/selectors
+     * are being cleaned up properly during close.
+     * 
+     * @throws Throwable
+     */
+    @Test
+    public void testClientCleanup() throws Throwable {
+        final int threadCount = 20;
+        final int clientCount = 100;
+        
+        VerifyClientCleanup threads[] = new VerifyClientCleanup[threadCount];
+        
+        for (int i = 0; i < threads.length; i++) {
+            threads[i] = new VerifyClientCleanup("VCC" + i, clientCount);
+            threads[i].start();
         }
-        if (event.getType() != Event.EventNone) {
-            try {
-                events.put(event);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
+        
+        for (int i = 0; i < threads.length; i++) {
+            threads[i].join(600000);
+            assertTrue(threads[i].current == threads[i].count);
         }
     }
 }

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java
 Wed Aug 27 16:36:40 2008
@@ -43,7 +43,6 @@
         LOG.info("FINISHED " + getName());
     }
 
-
     public void testRootWatchTriggered() throws Exception {
         class MyWatcher implements Watcher{
             boolean fired=false;

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java 
Wed Aug 27 16:36:40 2008
@@ -24,17 +24,22 @@
 import java.util.ArrayList;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumStats;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 
-public class QuorumTest extends ClientTest {
+public class QuorumTest extends ClientBase {
     private static final Logger LOG = Logger.getLogger(QuorumTest.class);
 
+    private ClientTest ct = new ClientTest();
+
     File s1dir, s2dir, s3dir, s4dir, s5dir;
     QuorumPeer s1, s2, s3, s4, s5;
+
     @Before
     @Override
     protected void setUp() throws Exception {
@@ -43,6 +48,7 @@
         setupTestEnv();
 
         hostPort = 
"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
+        ct.hostPort = hostPort;
 
         s1dir = ClientBase.createTmpDir();
         s2dir = ClientBase.createTmpDir();
@@ -126,4 +132,47 @@
             LOG.debug("QP interrupted", e);
         }
     }
+
+    @Test
+    public void testDeleteWithChildren() throws Exception {
+        ct.testDeleteWithChildren();
+    }
+    
+    @Test
+    public void testHammerBasic() throws Throwable {
+        ct.testHammerBasic();
+    }
+    
+    @Test
+    public void testPing() throws Exception {
+        ct.testPing();
+    }
+    
+    @Test
+    public void testSequentialNodeNames()
+        throws IOException, InterruptedException, KeeperException
+    {
+        ct.testSequentialNodeNames();
+    }
+
+    @Test
+    public void testACLs() throws Exception {
+        ct.testACLs();
+    }
+
+    @Test
+    public void testClientwithoutWatcherObj() throws IOException,
+            InterruptedException, KeeperException
+    {
+        ct.testClientwithoutWatcherObj();
+    }
+
+    @Test
+    public void testClientWithWatcherObj() throws IOException,
+            InterruptedException, KeeperException
+    {
+        ct.testClientWithWatcherObj();
+    }
+    
+    // skip superhammer and clientcleanup as they are too expensive for quorum
 }

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java 
Wed Aug 27 16:36:40 2008
@@ -23,6 +23,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
@@ -44,9 +45,9 @@
 
     private static final String HOSTPORT = "127.0.0.1:33299";
 
-    private CountDownLatch startSignal;
-
     private NIOServerCnxn.Factory serverFactory;
+    
+    private CountDownLatch startSignal;
 
     @Override
     protected void setUp() throws Exception {
@@ -78,12 +79,26 @@
         LOG.info("FINISHED " + getName());
     }
 
+    private static class CountdownWatcher implements Watcher {
+        volatile CountDownLatch clientConnected = new CountDownLatch(1);
+
+        public void process(WatcherEvent event) {
+            if (event.getState() == Event.KeeperStateSyncConnected) {
+                clientConnected.countDown();
+            }
+        }
+    }
+
     private ZooKeeper createClient()
         throws IOException, InterruptedException
     {
-        startSignal = new CountDownLatch(1);
-        ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
-        startSignal.await();
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, watcher);
+        if(!watcher.clientConnected.await(CONNECTION_TIMEOUT,
+                TimeUnit.MILLISECONDS))
+        {
+            fail("Unable to connect to server");
+        }
 
         return zk;
     }

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java
 (original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java
 Wed Aug 27 16:36:40 2008
@@ -25,20 +25,17 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.proto.WatcherEvent;
 import org.junit.Test;
 
 
 public class SyncCallTest extends ClientBase
-    implements Watcher, ChildrenCallback, StringCallback, VoidCallback
+    implements ChildrenCallback, StringCallback, VoidCallback
 {
-    private CountDownLatch clientConnected;
     private CountDownLatch opsCount;
     
     List<Integer> results = new LinkedList<Integer>();
@@ -59,7 +56,7 @@
             for(int i = 0; i < 100; i++)
                 zk.delete("/test" + i, 0, this, results);
             for(int i = 0; i < 100; i++)
-                zk.getChildren("/", this, this, results);
+                zk.getChildren("/", new NullWatcher(), this, results);
 
             LOG.info("Submitted all operations:" + (new Date()).toString());
             
@@ -74,22 +71,6 @@
             System.out.println(e.toString());
         } 
     }
-    
-    private ZooKeeper createClient() throws IOException,InterruptedException{
-        clientConnected=new CountDownLatch(1);
-        ZooKeeper zk = new ZooKeeper(hostPort, 30000, this);
-        if(!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){
-            fail("Unable to connect to server");
-        }
-        return zk;
-    }
-    
-    public void process(WatcherEvent event) {
-        //LOG.info("Process: " + event.getType() + " " + event.getPath());     
  
-        if (event.getState() == Event.KeeperStateSyncConnected) {
-            clientConnected.countDown();
-        }
-    }
 
     @SuppressWarnings("unchecked")
     public void processResult(int rc, String path, Object ctx,


Reply via email to