Author: breed Date: Wed Aug 27 16:36:40 2008 New Revision: 689668 URL: http://svn.apache.org/viewvc?rev=689668&view=rev Log: ZOOKEEPER-63. Race condition in client close() operation. (phunt via breed)
Modified: hadoop/zookeeper/trunk/CHANGES.txt hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java Modified: hadoop/zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/CHANGES.txt (original) +++ hadoop/zookeeper/trunk/CHANGES.txt Wed Aug 27 16:36:40 2008 @@ -47,3 +47,5 @@ ZOOKEEPER-125. Remove unwanted class declaration in FastLeaderElection. (Flavio Paiva Junqueira via mahadev) + + ZOOKEEPER-63. Race condition in client close() operation. (phunt via breed) Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Wed Aug 27 16:36:40 2008 @@ -34,11 +34,10 @@ import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.log4j.Logger; - import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; +import org.apache.log4j.Logger; import org.apache.zookeeper.AsyncCallback.ACLCallback; import org.apache.zookeeper.AsyncCallback.ChildrenCallback; import org.apache.zookeeper.AsyncCallback.DataCallback; @@ -63,7 +62,6 @@ import org.apache.zookeeper.proto.SetDataResponse; import org.apache.zookeeper.proto.WatcherEvent; import org.apache.zookeeper.server.ByteBufferInputStream; -import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooTrace; /** @@ -75,7 +73,8 @@ class ClientCnxn { private static final Logger LOG = Logger.getLogger(ClientCnxn.class); - private ArrayList<InetSocketAddress> serverAddrs = new ArrayList<InetSocketAddress>(); + private ArrayList<InetSocketAddress> serverAddrs = + new ArrayList<InetSocketAddress>(); static class AuthData { AuthData(String scheme, byte data[]) { @@ -122,6 +121,12 @@ final EventThread eventThread; final Selector selector = Selector.open(); + + /** Set to true when close is called. Latches the connection such that + * we don't attempt to re-connect to the server if in the middle of + * closing the connection (client sends session disconnect to server + * as part of close operation) */ + volatile boolean closing = false; public long getSessionId() { return sessionId; @@ -253,7 +258,7 @@ class EventThread extends Thread { EventThread() { - super("EventThread"); + super(currentThread().getName() + "-EventThread"); setUncaughtExceptionHandler(uncaughtExceptionHandler); setDaemon(true); } @@ -341,7 +346,10 @@ } } } catch (InterruptedException e) { + LOG.warn("Event thread exiting due to interruption", e); } + + LOG.info("EventThread shut down"); } } @@ -566,7 +574,7 @@ } SendThread() { - super("SendThread"); + super(currentThread().getName() + "-SendThread"); zooKeeper.state = States.CONNECTING; setUncaughtExceptionHandler(uncaughtExceptionHandler); setDaemon(true); @@ -666,6 +674,10 @@ while (zooKeeper.state.isAlive()) { try { if (sockKey == null) { + // don't re-establish connection if we are closing + if (closing) { + break; + } startConnect(); lastSend = now; lastHeard = now; @@ -730,21 +742,34 @@ } selected.clear(); } catch (Exception e) { - LOG.warn("Closing session 0x" - + Long.toHexString(getSessionId()), - e); - cleanup(); - if (zooKeeper.state.isAlive()) { - waitingEvents.add(new WatcherEvent(Event.EventNone, - Event.KeeperStateDisconnected, null)); + if (closing) { + // closing so this is expected + LOG.info("Exception while closing send thread for session 0x" + + Long.toHexString(getSessionId()) + + " : " + e.getMessage()); + break; + } else { + LOG.warn("Exception closing session 0x" + + Long.toHexString(getSessionId()), + e); + cleanup(); + if (zooKeeper.state.isAlive()) { + waitingEvents.add(new WatcherEvent(Event.EventNone, + Event.KeeperStateDisconnected, null)); + } + + now = System.currentTimeMillis(); + lastHeard = now; + lastSend = now; } - - now = System.currentTimeMillis(); - lastHeard = now; - lastSend = now; } } cleanup(); + try { + selector.close(); + } catch (IOException e) { + LOG.warn("Ignoring exception during selector close", e); + } ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exitedloop."); } @@ -755,25 +780,29 @@ sockKey.cancel(); try { sock.socket().shutdownInput(); - } catch (IOException e2) { + } catch (IOException e) { + LOG.warn("Ignoring exception during shutdown input", e); } try { sock.socket().shutdownOutput(); - } catch (IOException e2) { + } catch (IOException e) { + LOG.warn("Ignoring exception during shutdown output", e); } try { sock.socket().close(); - } catch (IOException e1) { + } catch (IOException e) { + LOG.warn("Ignoring exception during socket close", e); } try { sock.close(); - } catch (IOException e1) { + } catch (IOException e) { + LOG.warn("Ignoring exception during channel close", e); } } try { Thread.sleep(100); - } catch (InterruptedException e1) { - e1.printStackTrace(); + } catch (InterruptedException e) { + LOG.warn("SendThread interrupted during sleep, ignoring"); } sockKey = null; synchronized (pendingQueue) { @@ -798,15 +827,44 @@ } } - @SuppressWarnings("unchecked") - public void close() throws IOException { - LOG.info("Closing ClientCnxn for session: 0x" + /** + * Shutdown the send/event threads. This method should not be called + * directly - rather it should be called as part of close operation. This + * method is primarily here to allow the tests to verify disconnection + * behavior. + */ + public void disconnect() { + LOG.info("Disconnecting ClientCnxn for session: 0x" + Long.toHexString(getSessionId())); sendThread.close(); waitingEvents.add(eventOfDeath); } + /** + * Close the connection, which includes; send session disconnect to + * the server, shutdown the send/event threads. + * + * @throws IOException + */ + public void close() throws IOException { + LOG.info("Closing ClientCnxn for session: 0x" + + Long.toHexString(getSessionId())); + + closing = true; + + try { + RequestHeader h = new RequestHeader(); + h.setType(ZooDefs.OpCode.closeSession); + + submitRequest(h, null, null, null); + } catch (InterruptedException e) { + // ignore, close the send/event threads + } finally { + disconnect(); + } + } + private int xid = 1; synchronized private int getXid() { Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original) +++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Wed Aug 27 16:36:40 2008 @@ -312,9 +312,6 @@ public synchronized void close() throws InterruptedException { LOG.info("Closing session: 0x" + Long.toHexString(getSessionId())); - RequestHeader h = new RequestHeader(); - h.setType(ZooDefs.OpCode.closeSession); - cnxn.submitRequest(h, null, null, null); try { cnxn.close(); } catch (IOException e) { @@ -940,7 +937,10 @@ // Everything below this line is for testing! + /** Testing only!!! Really this needs to be moved into a stub in the + * tests - pending JIRA for that. + */ public void disconnect() throws IOException { - cnxn.close(); + cnxn.disconnect(); } } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java Wed Aug 27 16:36:40 2008 @@ -45,12 +45,11 @@ import org.junit.Test; public class AsyncTest extends TestCase - implements Watcher, StringCallback, VoidCallback, DataCallback + implements StringCallback, VoidCallback, DataCallback { private static final Logger LOG = Logger.getLogger(AsyncTest.class); private QuorumTest quorumTest = new QuorumTest(); - private CountDownLatch clientConnected; private volatile boolean bang; @@ -72,11 +71,20 @@ @Override protected void tearDown() throws Exception { LOG.info("Test clients shutting down"); - clientConnected = null; quorumTest.tearDown(); LOG.info("FINISHED " + getName()); } + private static class CountdownWatcher implements Watcher { + volatile CountDownLatch clientConnected = new CountDownLatch(1); + + public void process(WatcherEvent event) { + if (event.getState() == Event.KeeperStateSyncConnected) { + clientConnected.countDown(); + } + } + } + private ZooKeeper createClient() throws IOException,InterruptedException { return createClient(quorumTest.hostPort); } @@ -84,9 +92,11 @@ private ZooKeeper createClient(String hp) throws IOException, InterruptedException { - clientConnected = new CountDownLatch(1); - ZooKeeper zk = new ZooKeeper(hp, 30000, this); - if(!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){ + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper(hp, 30000, watcher); + if(!watcher.clientConnected.await(CONNECTION_TIMEOUT, + TimeUnit.MILLISECONDS)) + { fail("Unable to connect to server"); } return zk; @@ -239,12 +249,6 @@ } } - public void process(WatcherEvent event) { - if(event.getState()==Event.KeeperStateSyncConnected){ - clientConnected.countDown(); - } - } - @SuppressWarnings("unchecked") public void processResult(int rc, String path, Object ctx, String name) { synchronized(ctx) { Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Wed Aug 27 16:36:40 2008 @@ -25,10 +25,16 @@ import java.io.OutputStream; import java.net.Socket; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import junit.framework.TestCase; import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.proto.WatcherEvent; import org.apache.zookeeper.server.NIOServerCnxn; import org.apache.zookeeper.server.ServerStats; import org.apache.zookeeper.server.SyncRequestProcessor; @@ -53,6 +59,51 @@ super(name); } + /** + * In general don't use this. Only use in the special case that you + * want to ignore results (for whatever reason) in your test. Don't + * use empty watchers in real code! + * + */ + protected class NullWatcher implements Watcher { + public void process(WatcherEvent event) { /* nada */ } + } + + protected static class CountdownWatcher implements Watcher { + volatile CountDownLatch clientConnected = new CountDownLatch(1); + + public void process(WatcherEvent event) { + if (event.getState() == Event.KeeperStateSyncConnected) { + clientConnected.countDown(); + } + } + } + + protected ZooKeeper createClient() + throws IOException, InterruptedException + { + return createClient(hostPort); + } + + protected ZooKeeper createClient(String hp) + throws IOException, InterruptedException + { + CountdownWatcher watcher = new CountdownWatcher(); + return createClient(watcher, hp); + } + + protected ZooKeeper createClient(CountdownWatcher watcher, String hp) + throws IOException, InterruptedException + { + ZooKeeper zk = new ZooKeeper(hp, 20000, watcher); + if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, + TimeUnit.MILLISECONDS)) + { + fail("Unable to connect to server"); + } + return zk; + } + public static boolean waitForServerUp(String hp, long timeout) { long start = System.currentTimeMillis(); String split[] = hp.split(":"); @@ -242,4 +293,44 @@ return d.delete(); } + /* + * Verify that all of the servers see the same number of nodes + * at the root + */ + void verifyRootOfAllServersMatch(String hostPort) + throws InterruptedException, KeeperException, IOException + { + String parts[] = hostPort.split(","); + + // run through till the counts no longer change on each server + // max 15 tries, with 2 second sleeps, so approx 30 seconds + int[] counts = new int[parts.length]; + for (int j = 0; j < 100; j++) { + int newcounts[] = new int[parts.length]; + int i = 0; + for (String hp : parts) { + ZooKeeper zk = createClient(hp); + try { + newcounts[i++] = zk.getChildren("/", false).size(); + } finally { + zk.close(); + } + } + + if (Arrays.equals(newcounts, counts)) { + LOG.info("Found match with array:" + + Arrays.toString(newcounts)); + counts = newcounts; + break; + } else { + counts = newcounts; + Thread.sleep(10000); + } + } + + // verify all the servers reporting same number of nodes + for (int i = 1; i < parts.length; i++) { + assertEquals("node count not consistent", counts[i-1], counts[i]); + } + } } \ No newline at end of file Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java Wed Aug 27 16:36:40 2008 @@ -20,20 +20,18 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.InvalidACLException; +import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.ZooDefs.CreateFlags; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Perms; @@ -43,33 +41,14 @@ import org.apache.zookeeper.proto.WatcherEvent; import org.junit.Test; -public class ClientTest extends ClientBase implements Watcher { +public class ClientTest extends ClientBase { protected static final Logger LOG = Logger.getLogger(ClientTest.class); LinkedBlockingQueue<WatcherEvent> events = new LinkedBlockingQueue<WatcherEvent>(); - protected volatile CountDownLatch clientConnected; - - protected ZooKeeper createClient(Watcher watcher) - throws IOException, InterruptedException - { - return createClient(watcher, hostPort); - } - - protected ZooKeeper createClient(Watcher watcher, String hp) - throws IOException, InterruptedException - { - clientConnected = new CountDownLatch(1); - ZooKeeper zk = new ZooKeeper(hp, 20000, watcher); - if (!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { - fail("Unable to connect to server"); - } - return zk; - } @Override protected void tearDown() throws Exception { - clientConnected = null; super.tearDown(); LOG.info("FINISHED " + getName()); } @@ -79,8 +58,8 @@ ZooKeeper zkIdle = null; ZooKeeper zkWatchCreator = null; try { - zkIdle = createClient(this); - zkWatchCreator = createClient(this); + zkIdle = createClient(); + zkWatchCreator = createClient(); for (int i = 0; i < 30; i++) { zkWatchCreator.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); } @@ -121,12 +100,13 @@ public void testACLs() throws Exception { ZooKeeper zk = null; try { - zk = createClient(this); + zk = createClient(); try { zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0); fail("Should have received an invalid acl error"); } catch(InvalidACLException e) { - LOG.error("Invalid acl", e); + LOG.info("Test successful, invalid acl received : " + + e.getMessage()); } try { ArrayList<ACL> testACL = new ArrayList<ACL>(); @@ -135,12 +115,13 @@ zk.create("/acltest", new byte[0], testACL, 0); fail("Should have received an invalid acl error"); } catch(InvalidACLException e) { - LOG.error("Invalid acl", e); + LOG.info("Test successful, invalid acl received : " + + e.getMessage()); } zk.addAuthInfo("digest", "ben:passwd".getBytes()); zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0); zk.close(); - zk = createClient(this); + zk = createClient(); zk.addAuthInfo("digest", "ben:passwd2".getBytes()); try { zk.getData("/acltest", false, new Stat()); @@ -152,7 +133,7 @@ zk.getData("/acltest", false, new Stat()); zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1); zk.close(); - zk = createClient(this); + zk = createClient(); zk.getData("/acltest", false, new Stat()); List<ACL> acls = zk.getACL("/acltest", new Stat()); assertEquals(1, acls.size()); @@ -165,11 +146,25 @@ } } - private void performClientTest(boolean withWatcherObj) throws IOException, - InterruptedException, KeeperException { + protected class MyWatcher extends CountdownWatcher { + public void process(WatcherEvent event) { + super.process(event); + if (event.getType() != Event.EventNone) { + try { + events.put(event); + } catch (InterruptedException e) { + LOG.warn("ignoring interrupt during event.put"); + } + } + } + } + + private void performClientTest(boolean withWatcherObj) + throws IOException, InterruptedException, KeeperException + { ZooKeeper zk = null; try { - zk =createClient(this); + zk = createClient(new MyWatcher(), hostPort); //LOG.info("Created client: " + zk.describeCNXN()); LOG.info("Before create /benwashere"); zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE, 0); @@ -188,7 +183,7 @@ zk.close(); //LOG.info("Closed client: " + zk.describeCNXN()); Thread.sleep(2000); - zk = createClient(this); + zk = createClient(new MyWatcher(), hostPort); //LOG.info("Created a new client: " + zk.describeCNXN()); LOG.info("Before delete /"); @@ -213,7 +208,7 @@ if (withWatcherObj) { assertEquals(null, zk.exists("/frog", new MyWatcher())); } else { - assertEquals(null, zk.exists("/frog", true)); + assertEquals(null, zk.exists("/frog", true)); } LOG.info("Comment: asseting passed for frog setting /"); } catch (KeeperException.NoNodeException e) { @@ -292,14 +287,17 @@ // Test that sequential filenames are being created correctly, // with 0-padding in the filename - public void testSequentialNodeNames() throws IOException, InterruptedException, KeeperException { + @Test + public void testSequentialNodeNames() + throws IOException, InterruptedException, KeeperException + { String path = "/SEQUENCE"; - String file = "TEST"; - String filepath = path + "/" + file; + String file = "TEST"; + String filepath = path + "/" + file; ZooKeeper zk = null; try { - zk =createClient(this); + zk = createClient(); zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.SEQUENCE); List<String> children = zk.getChildren(path, false); @@ -351,7 +349,7 @@ @Test public void testDeleteWithChildren() throws Exception { - ZooKeeper zk = createClient(this); + ZooKeeper zk = createClient(); zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, 0); zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, 0); try { @@ -364,20 +362,27 @@ zk.delete("/parent", -1); zk.close(); } - - private static class HammerThread extends Thread { - private static final long LATENCY = 5; + + private static final long HAMMERTHREAD_LATENCY = 5; + + private static abstract class HammerThread extends Thread { + protected final int count; + protected volatile int current = 0; + HammerThread(String name, int count) { + super(name); + this.count = count; + } + } + + private static class BasicHammerThread extends HammerThread { private final ZooKeeper zk; private final String prefix; - private final int count; - private volatile int current = 0; - HammerThread(String name, ZooKeeper zk, String prefix, int count) { - super(name); + BasicHammerThread(String name, ZooKeeper zk, String prefix, int count) { + super(name, count); this.zk = zk; this.prefix = prefix; - this.count = count; } public void run() { @@ -385,87 +390,129 @@ try { for (; current < count; current++) { // Simulate a bit of network latency... - Thread.sleep(LATENCY); + Thread.sleep(HAMMERTHREAD_LATENCY); zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0); } - } catch (Exception e) { - LOG.error("Client create operation failed", e); + } catch (Throwable t) { + LOG.error("Client create operation failed", t); } finally { - if (zk != null) { - try { - zk.close(); - } catch (InterruptedException e) { - LOG.warn("Unexpected", e); - } + try { + zk.close(); + } catch (InterruptedException e) { + LOG.warn("Unexpected", e); } } } } - /* - * Verify that all of the servers see the same number of nodes - * at the root - */ - void verifyRootOfAllServersMatch(String hostPort) - throws InterruptedException, KeeperException, IOException - { - String parts[] = hostPort.split(","); + private static class SuperHammerThread extends HammerThread { + private final ClientTest parent; + private final String prefix; - // run through till the counts no longer change on each server - // max 15 tries, with 2 second sleeps, so approx 30 seconds - int[] counts = new int[parts.length]; - for (int j = 0; j < 100; j++) { - int newcounts[] = new int[parts.length]; - int i = 0; - for (String hp : parts) { - ZooKeeper zk = createClient(this, hp); - try { - newcounts[i++] = zk.getChildren("/", false).size(); - } finally { - zk.close(); - } - } + SuperHammerThread(String name, ClientTest parent, String prefix, + int count) + { + super(name, count); + this.parent = parent; + this.prefix = prefix; + } - if (Arrays.equals(newcounts, counts)) { - LOG.info("Found match with array:" - + Arrays.toString(newcounts)); - counts = newcounts; - break; - } else { - counts = newcounts; - Thread.sleep(10000); + public void run() { + byte b[] = new byte[256]; + try { + for (; current < count; current++) { + ZooKeeper zk = parent.createClient(); + try { + zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0); + } finally { + try { + zk.close(); + } catch (InterruptedException e) { + LOG.warn("Unexpected", e); + } + } + } + } catch (Throwable t) { + LOG.error("Client create operation failed", t); } } + } - // verify all the servers reporting same number of nodes - for (int i = 1; i < parts.length; i++) { - assertEquals("node count not consistent", counts[i-1], counts[i]); + /** + * Separate threads each creating a number of nodes. Each thread + * is using a non-shared (owned by thread) client for all node creations. + * @throws Throwable + */ + @Test + public void testHammerBasic() throws Throwable { + try { + final int threadCount = 10; + final int childCount = 1000; + + HammerThread[] threads = new HammerThread[threadCount]; + long start = System.currentTimeMillis(); + for (int i = 0; i < threads.length; i++) { + ZooKeeper zk = createClient(); + String prefix = "/test-" + i; + zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); + prefix += "/"; + HammerThread thread = + new BasicHammerThread("BasicHammerThread-" + i, zk, prefix, + childCount); + thread.start(); + + threads[i] = thread; + } + + verifyHammer(start, threads, childCount); + } catch (Throwable t) { + LOG.error("test failed", t); + throw t; } } - - + + /** + * Separate threads each creating a number of nodes. Each thread + * is creating a new client for each node creation. + * @throws Throwable + */ @Test - public void testHammer() - throws IOException, InterruptedException, KeeperException - { - final int threadCount = 10; - final int childCount = 1000; - - HammerThread[] threads = new HammerThread[threadCount]; - long start = System.currentTimeMillis(); - for (int i = 0; i < threads.length; i++) { - Thread.sleep(10); - ZooKeeper zk = createClient(this); - String prefix = "/test-" + i; - zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); - prefix += "/"; - HammerThread thread = - new HammerThread("HammerThread-" + i, zk, prefix, childCount); - thread.start(); + public void testHammerSuper() throws Throwable { + try { + final int threadCount = 5; + final int childCount = 10; - threads[i] = thread; + HammerThread[] threads = new HammerThread[threadCount]; + long start = System.currentTimeMillis(); + for (int i = 0; i < threads.length; i++) { + String prefix = "/test-" + i; + { + ZooKeeper zk = createClient(); + try { + zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0); + } finally { + zk.close(); + } + } + prefix += "/"; + HammerThread thread = + new SuperHammerThread("SuperHammerThread-" + i, this, + prefix, childCount); + thread.start(); + + threads[i] = thread; + } + + verifyHammer(start, threads, childCount); + } catch (Throwable t) { + LOG.error("test failed", t); + throw t; } - + } + + public void verifyHammer(long start, HammerThread[] threads, int childCount) + throws IOException, InterruptedException, KeeperException + { // look for the clients to finish their create operations LOG.info("Starting check for completed hammers"); int workingCount = threads.length; @@ -493,44 +540,75 @@ for (HammerThread h : threads) { final int safetyFactor = 3; verifyThreadTerminated(h, - threadCount * childCount - * HammerThread.LATENCY * safetyFactor); + threads.length * childCount + * HAMMERTHREAD_LATENCY * safetyFactor); } LOG.info(new Date() + " Total time " + (System.currentTimeMillis() - start)); - ZooKeeper zk = createClient(this); - - LOG.info("******************* Connected to ZooKeeper" + new Date()); - for (int i = 0; i < threadCount; i++) { - LOG.info("Doing thread: " + i + " " + new Date()); - List<String> children = - zk.getChildren("/test-" + i, false); - assertEquals(childCount, children.size()); - } - for (int i = 0; i < threadCount; i++) { - List<String> children = - zk.getChildren("/test-" + i, false); - assertEquals(childCount, children.size()); + ZooKeeper zk = createClient(); + try { + + LOG.info("******************* Connected to ZooKeeper" + new Date()); + for (int i = 0; i < threads.length; i++) { + LOG.info("Doing thread: " + i + " " + new Date()); + List<String> children = + zk.getChildren("/test-" + i, false); + assertEquals(childCount, children.size()); + } + for (int i = 0; i < threads.length; i++) { + List<String> children = + zk.getChildren("/test-" + i, false); + assertEquals(childCount, children.size()); + } + } finally { + zk.close(); } } - - public class MyWatcher implements Watcher { - public void process(WatcherEvent event) { - ClientTest.this.process(event); + + private class VerifyClientCleanup extends Thread { + int count; + int current = 0; + + VerifyClientCleanup(String name, int count) { + super(name); + this.count = count; + } + + public void run() { + try { + for (; current < count; current++) { + ZooKeeper zk = createClient(); + zk.close(); + } + } catch (Throwable t) { + LOG.error("test failed", t); + } } } - public void process(WatcherEvent event) { - if (event.getState() == Event.KeeperStateSyncConnected) { - clientConnected.countDown(); + /** + * Verify that the client is cleaning up properly. Open/close a large + * number of sessions. Essentially looking to see if sockets/selectors + * are being cleaned up properly during close. + * + * @throws Throwable + */ + @Test + public void testClientCleanup() throws Throwable { + final int threadCount = 20; + final int clientCount = 100; + + VerifyClientCleanup threads[] = new VerifyClientCleanup[threadCount]; + + for (int i = 0; i < threads.length; i++) { + threads[i] = new VerifyClientCleanup("VCC" + i, clientCount); + threads[i].start(); } - if (event.getType() != Event.EventNone) { - try { - events.put(event); - } catch (InterruptedException e) { - e.printStackTrace(); - } + + for (int i = 0; i < threads.length; i++) { + threads[i].join(600000); + assertTrue(threads[i].current == threads[i].count); } } } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java Wed Aug 27 16:36:40 2008 @@ -43,7 +43,6 @@ LOG.info("FINISHED " + getName()); } - public void testRootWatchTriggered() throws Exception { class MyWatcher implements Watcher{ boolean fired=false; Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Wed Aug 27 16:36:40 2008 @@ -24,17 +24,22 @@ import java.util.ArrayList; import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumStats; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.junit.After; import org.junit.Before; +import org.junit.Test; -public class QuorumTest extends ClientTest { +public class QuorumTest extends ClientBase { private static final Logger LOG = Logger.getLogger(QuorumTest.class); + private ClientTest ct = new ClientTest(); + File s1dir, s2dir, s3dir, s4dir, s5dir; QuorumPeer s1, s2, s3, s4, s5; + @Before @Override protected void setUp() throws Exception { @@ -43,6 +48,7 @@ setupTestEnv(); hostPort = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185"; + ct.hostPort = hostPort; s1dir = ClientBase.createTmpDir(); s2dir = ClientBase.createTmpDir(); @@ -126,4 +132,47 @@ LOG.debug("QP interrupted", e); } } + + @Test + public void testDeleteWithChildren() throws Exception { + ct.testDeleteWithChildren(); + } + + @Test + public void testHammerBasic() throws Throwable { + ct.testHammerBasic(); + } + + @Test + public void testPing() throws Exception { + ct.testPing(); + } + + @Test + public void testSequentialNodeNames() + throws IOException, InterruptedException, KeeperException + { + ct.testSequentialNodeNames(); + } + + @Test + public void testACLs() throws Exception { + ct.testACLs(); + } + + @Test + public void testClientwithoutWatcherObj() throws IOException, + InterruptedException, KeeperException + { + ct.testClientwithoutWatcherObj(); + } + + @Test + public void testClientWithWatcherObj() throws IOException, + InterruptedException, KeeperException + { + ct.testClientWithWatcherObj(); + } + + // skip superhammer and clientcleanup as they are too expensive for quorum } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java Wed Aug 27 16:36:40 2008 @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import junit.framework.TestCase; @@ -44,9 +45,9 @@ private static final String HOSTPORT = "127.0.0.1:33299"; - private CountDownLatch startSignal; - private NIOServerCnxn.Factory serverFactory; + + private CountDownLatch startSignal; @Override protected void setUp() throws Exception { @@ -78,12 +79,26 @@ LOG.info("FINISHED " + getName()); } + private static class CountdownWatcher implements Watcher { + volatile CountDownLatch clientConnected = new CountDownLatch(1); + + public void process(WatcherEvent event) { + if (event.getState() == Event.KeeperStateSyncConnected) { + clientConnected.countDown(); + } + } + } + private ZooKeeper createClient() throws IOException, InterruptedException { - startSignal = new CountDownLatch(1); - ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); - startSignal.await(); + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, watcher); + if(!watcher.clientConnected.await(CONNECTION_TIMEOUT, + TimeUnit.MILLISECONDS)) + { + fail("Unable to connect to server"); + } return zk; } Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java?rev=689668&r1=689667&r2=689668&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java Wed Aug 27 16:36:40 2008 @@ -25,20 +25,17 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.ChildrenCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.proto.WatcherEvent; import org.junit.Test; public class SyncCallTest extends ClientBase - implements Watcher, ChildrenCallback, StringCallback, VoidCallback + implements ChildrenCallback, StringCallback, VoidCallback { - private CountDownLatch clientConnected; private CountDownLatch opsCount; List<Integer> results = new LinkedList<Integer>(); @@ -59,7 +56,7 @@ for(int i = 0; i < 100; i++) zk.delete("/test" + i, 0, this, results); for(int i = 0; i < 100; i++) - zk.getChildren("/", this, this, results); + zk.getChildren("/", new NullWatcher(), this, results); LOG.info("Submitted all operations:" + (new Date()).toString()); @@ -74,22 +71,6 @@ System.out.println(e.toString()); } } - - private ZooKeeper createClient() throws IOException,InterruptedException{ - clientConnected=new CountDownLatch(1); - ZooKeeper zk = new ZooKeeper(hostPort, 30000, this); - if(!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){ - fail("Unable to connect to server"); - } - return zk; - } - - public void process(WatcherEvent event) { - //LOG.info("Process: " + event.getType() + " " + event.getPath()); - if (event.getState() == Event.KeeperStateSyncConnected) { - clientConnected.countDown(); - } - } @SuppressWarnings("unchecked") public void processResult(int rc, String path, Object ctx,