Author: breed Date: Sun Nov 8 22:46:04 2009 New Revision: 833938 URL: http://svn.apache.org/viewvc?rev=833938&view=rev Log: ZOOKEEPER-570. AsyncHammerTest is broken, callbacks need to validate rc parameter
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java?rev=833938&r1=833937&r2=833938&view=diff ============================================================================== --- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java (original) +++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java Sun Nov 8 22:46:04 2009 @@ -27,14 +27,15 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.TestableZooKeeper; import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.DataCallback; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -56,6 +57,10 @@ } protected void restart() throws Exception { + LOG.info("RESTARTING " + getName()); + qb.tearDown(); + + // don't call setup - we don't want to reassign ports/dirs, etc... JMXEnv.setUp(); qb.startServers(); } @@ -71,21 +76,24 @@ /** * Create /test- sequence nodes asynchronously, max 30 outstanding */ - class HammerThread extends Thread - implements Watcher, StringCallback, VoidCallback - { + class HammerThread extends Thread implements StringCallback, VoidCallback { private static final int MAX_OUTSTANDING = 30; - private ZooKeeper zk; + private TestableZooKeeper zk; private int outstanding; + private volatile boolean failed = false; + public HammerThread(String name) { super(name); } public void run() { try { - zk = new ZooKeeper(qb.hostPort, CONNECTION_TIMEOUT, this); + CountdownWatcher watcher = new CountdownWatcher(); + zk = new TestableZooKeeper(qb.hostPort, CONNECTION_TIMEOUT, + watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); while(bang) { incOutstanding(); // before create otw race zk.create("/test-", new byte[0], Ids.OPEN_ACL_UNSAFE, @@ -103,8 +111,12 @@ if (zk != null) { try { zk.close(); + if (!zk.testableWaitForShutdown(CONNECTION_TIMEOUT)) { + failed = true; + LOG.error("Client did not shutdown"); + } } catch (InterruptedException e) { - LOG.warn("Unexpected", e); + LOG.info("Interrupted", e); } } } @@ -128,43 +140,67 @@ } public void processResult(int rc, String path, Object ctx, String name) { + if (rc != KeeperException.Code.OK.intValue()) { + if (bang) { + failed = true; + LOG.error("Create failed for 0x" + + Long.toHexString(zk.getSessionId()) + + "with rc:" + rc + " path:" + path); + } + decOutstanding(); + return; + } try { decOutstanding(); - zk.delete(path, -1, this, null); + zk.delete(name, -1, this, null); } catch (Exception e) { - LOG.error("Client delete failed", e); + if (bang) { + failed = true; + LOG.error("Client delete failed", e); + } } } public void processResult(int rc, String path, Object ctx) { - // ignore for purposes of this test + if (rc != KeeperException.Code.OK.intValue()) { + if (bang) { + failed = true; + LOG.error("Delete failed for 0x" + + Long.toHexString(zk.getSessionId()) + + "with rc:" + rc + " path:" + path); + } + } } } @Test public void testHammer() throws Exception { bang = true; - Thread[] hammers = new Thread[100]; + LOG.info("Starting hammers"); + HammerThread[] hammers = new HammerThread[100]; for (int i = 0; i < hammers.length; i++) { hammers[i] = new HammerThread("HammerThread-" + i); hammers[i].start(); } + LOG.info("Started hammers"); Thread.sleep(5000); // allow the clients to run for max 5sec bang = false; + LOG.info("Stopping hammers"); for (int i = 0; i < hammers.length; i++) { hammers[i].interrupt(); verifyThreadTerminated(hammers[i], 60000); + assertFalse(hammers[i].failed); } + // before restart - QuorumBase qt = new QuorumBase(); - qt.setUp(); - qt.verifyRootOfAllServersMatch(qb.hostPort); - tearDown(); + LOG.info("Hammers stopped, verifying consistency"); + qb.verifyRootOfAllServersMatch(qb.hostPort); restart(); // after restart - qt.verifyRootOfAllServersMatch(qb.hostPort); + LOG.info("Verifying hammers 2"); + qb.verifyRootOfAllServersMatch(qb.hostPort); } @SuppressWarnings("unchecked")