Author: mahadev Date: Wed Nov 19 02:22:19 2008 New Revision: 718926 URL: http://svn.apache.org/viewvc?rev=718926&view=rev Log: ZOOKEEPER-204. SetWatches needs to be the first message after auth messages to the server (ben via mahadev)
Added: hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java Modified: hadoop/zookeeper/branches/branch-3.0/CHANGES.txt hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java Modified: hadoop/zookeeper/branches/branch-3.0/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/CHANGES.txt?rev=718926&r1=718925&r2=718926&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.0/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.0/CHANGES.txt Wed Nov 19 02:22:19 2008 @@ -30,6 +30,9 @@ ZOOKEEPER-226. fix exists calls that fail on server if node has null data. (mahadev) + ZOOKEEPER-204. SetWatches needs to be the first message after auth messages +to the server (ben via mahadev) + Release 3.0.0 - 2008-10-21 Non-backward compatible changes: Modified: hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=718926&r1=718925&r2=718926&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java (original) +++ hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java Wed Nov 19 02:22:19 2008 @@ -336,87 +336,96 @@ try { while (true) { Object event = waitingEvents.take(); - if (event == eventOfDeath) { - break; - } - - if (event instanceof WatcherSetEventPair) { - // each watcher will process the event - WatcherSetEventPair pair = (WatcherSetEventPair)event; - for (Watcher watcher: pair.watchers) { - watcher.process(pair.event); - } - } else { - Packet p = (Packet) event; - int rc = 0; - String path = p.path; - if (p.replyHeader.getErr() != 0) { - rc = p.replyHeader.getErr(); + try { + if (event == eventOfDeath) { + break; } - if (p.cb == null) { - LOG.warn("Somehow a null cb got to EventThread!"); - } else if (p.response instanceof ExistsResponse - || p.response instanceof SetDataResponse - || p.response instanceof SetACLResponse) { - StatCallback cb = (StatCallback) p.cb; - if (rc == 0) { - if (p.response instanceof ExistsResponse) { - cb.processResult(rc, path, p.ctx, - ((ExistsResponse) p.response) - .getStat()); - } else if (p.response instanceof SetDataResponse) { - cb.processResult(rc, path, p.ctx, - ((SetDataResponse) p.response) - .getStat()); - } else if (p.response instanceof SetACLResponse) { - cb.processResult(rc, path, p.ctx, - ((SetACLResponse) p.response) - .getStat()); + + if (event instanceof WatcherSetEventPair) { + // each watcher will process the event + WatcherSetEventPair pair = (WatcherSetEventPair) event; + for (Watcher watcher : pair.watchers) { + try { + watcher.process(pair.event); + } catch (Throwable t) { + LOG.error("Error while calling watcher ", t); } - } else { - cb.processResult(rc, path, p.ctx, null); - } - } else if (p.response instanceof GetDataResponse) { - DataCallback cb = (DataCallback) p.cb; - GetDataResponse rsp = (GetDataResponse) p.response; - if (rc == 0) { - cb.processResult(rc, path, p.ctx, - rsp.getData(), rsp.getStat()); - } else { - cb.processResult(rc, path, p.ctx, null, null); - } - } else if (p.response instanceof GetACLResponse) { - ACLCallback cb = (ACLCallback) p.cb; - GetACLResponse rsp = (GetACLResponse) p.response; - if (rc == 0) { - cb.processResult(rc, path, p.ctx, rsp.getAcl(), - rsp.getStat()); - } else { - cb.processResult(rc, path, p.ctx, null, null); } - } else if (p.response instanceof GetChildrenResponse) { - ChildrenCallback cb = (ChildrenCallback) p.cb; - GetChildrenResponse rsp = (GetChildrenResponse) p.response; - if (rc == 0) { - cb.processResult(rc, path, p.ctx, rsp - .getChildren()); - } else { - cb.processResult(rc, path, p.ctx, null); + } else { + Packet p = (Packet) event; + int rc = 0; + String path = p.path; + if (p.replyHeader.getErr() != 0) { + rc = p.replyHeader.getErr(); } - } else if (p.response instanceof CreateResponse) { - StringCallback cb = (StringCallback) p.cb; - CreateResponse rsp = (CreateResponse) p.response; - if (rc == 0) { - cb - .processResult(rc, path, p.ctx, rsp - .getPath()); - } else { - cb.processResult(rc, path, p.ctx, null); + if (p.cb == null) { + LOG.warn("Somehow a null cb got to EventThread!"); + } else if (p.response instanceof ExistsResponse + || p.response instanceof SetDataResponse + || p.response instanceof SetACLResponse) { + StatCallback cb = (StatCallback) p.cb; + if (rc == 0) { + if (p.response instanceof ExistsResponse) { + cb.processResult(rc, path, p.ctx, + ((ExistsResponse) p.response) + .getStat()); + } else if (p.response instanceof SetDataResponse) { + cb.processResult(rc, path, p.ctx, + ((SetDataResponse) p.response) + .getStat()); + } else if (p.response instanceof SetACLResponse) { + cb.processResult(rc, path, p.ctx, + ((SetACLResponse) p.response) + .getStat()); + } + } else { + cb.processResult(rc, path, p.ctx, null); + } + } else if (p.response instanceof GetDataResponse) { + DataCallback cb = (DataCallback) p.cb; + GetDataResponse rsp = (GetDataResponse) p.response; + if (rc == 0) { + cb.processResult(rc, path, p.ctx, rsp + .getData(), rsp.getStat()); + } else { + cb.processResult(rc, path, p.ctx, null, + null); + } + } else if (p.response instanceof GetACLResponse) { + ACLCallback cb = (ACLCallback) p.cb; + GetACLResponse rsp = (GetACLResponse) p.response; + if (rc == 0) { + cb.processResult(rc, path, p.ctx, rsp + .getAcl(), rsp.getStat()); + } else { + cb.processResult(rc, path, p.ctx, null, + null); + } + } else if (p.response instanceof GetChildrenResponse) { + ChildrenCallback cb = (ChildrenCallback) p.cb; + GetChildrenResponse rsp = (GetChildrenResponse) p.response; + if (rc == 0) { + cb.processResult(rc, path, p.ctx, rsp + .getChildren()); + } else { + cb.processResult(rc, path, p.ctx, null); + } + } else if (p.response instanceof CreateResponse) { + StringCallback cb = (StringCallback) p.cb; + CreateResponse rsp = (CreateResponse) p.response; + if (rc == 0) { + cb.processResult(rc, path, p.ctx, rsp + .getPath()); + } else { + cb.processResult(rc, path, p.ctx, null); + } + } else if (p.cb instanceof VoidCallback) { + VoidCallback cb = (VoidCallback) p.cb; + cb.processResult(rc, path, p.ctx); } - } else if (p.cb instanceof VoidCallback) { - VoidCallback cb = (VoidCallback) p.cb; - cb.processResult(rc, path, p.ctx); } + } catch (Throwable t) { + LOG.error("Caught unexpected throwable", t); } } } catch (InterruptedException e) { @@ -504,15 +513,6 @@ sessionPasswd = conRsp.getPasswd(); eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected, null)); - if (!disableAutoWatchReset) { - SetWatches sw = new SetWatches(lastZxid, - zooKeeper.getDataWatches(), - zooKeeper.getExistWatches(), - zooKeeper.getChildWatches()); - RequestHeader h = new RequestHeader(); - h.setType(ZooDefs.OpCode.setWatches); - queuePacket(h, new ReplyHeader(), sw, null, null, null, null, null); - } } void readResponse() throws IOException { @@ -702,6 +702,20 @@ bb.putInt(bb.capacity() - 4); bb.rewind(); synchronized (outgoingQueue) { + // We add backwards since we are pushing into the front + if (!disableAutoWatchReset) { + SetWatches sw = new SetWatches(lastZxid, + zooKeeper.getDataWatches(), + zooKeeper.getExistWatches(), + zooKeeper.getChildWatches()); + RequestHeader h = new RequestHeader(); + h.setType(ZooDefs.OpCode.setWatches); + h.setXid(-8); + Packet packet = new Packet(h, new ReplyHeader(), sw, null, null, + null); + outgoingQueue.addFirst(packet); + } + for (AuthData id : authInfo) { outgoingQueue.addFirst(new Packet(new RequestHeader(-4, OpCode.auth), null, new AuthPacket(0, id.scheme, Modified: hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=718926&r1=718925&r2=718926&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java (original) +++ hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java Wed Nov 19 02:22:19 2008 @@ -192,8 +192,11 @@ } // XXX This shouldn't be needed, but just in case synchronized (existWatches) { - addTo(existWatches.remove(path), result); - LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!"); + Set<Watcher> list = existWatches.remove(path); + if (list != null) { + addTo(existWatches.remove(path), result); + LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!"); + } } synchronized (childWatches) { addTo(childWatches.remove(path), result); Added: hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java?rev=718926&view=auto ============================================================================== --- hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java (added) +++ hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java Wed Nov 19 02:22:19 2008 @@ -0,0 +1,54 @@ +package org.apache.zookeeper; + +import java.io.IOException; +import java.nio.channels.SocketChannel; +import java.util.List; + +public class TestableZooKeeper extends ZooKeeper { + + public TestableZooKeeper(String host, int sessionTimeout, + Watcher watcher) throws IOException { + super(host, sessionTimeout, watcher); + } + + @Override + public List<String> getChildWatches() { + return super.getChildWatches(); + } + + + @Override + public List<String> getDataWatches() { + return super.getDataWatches(); + } + + + @Override + public List<String> getExistWatches() { + return super.getExistWatches(); + } + + + /** + * Cause this ZooKeeper object to stop receiving from the ZooKeeperServer + * for the given number of milliseconds. + * @param ms the number of milliseconds to pause. + */ + public void pauseCnxn(final long ms) { + new Thread() { + public void run() { + synchronized(cnxn) { + try { + try { + ((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close(); + } catch (IOException e) { + e.printStackTrace(); + } + Thread.sleep(ms); + } catch (InterruptedException e) { + } + } + } + }.start(); + } +} Modified: hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=718926&r1=718925&r2=718926&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java (original) +++ hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java Wed Nov 19 02:22:19 2008 @@ -33,6 +33,7 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.TestableZooKeeper; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; @@ -126,10 +127,10 @@ return createClient(watcher, hp); } - protected ZooKeeper createClient(CountdownWatcher watcher, String hp) + protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp) throws IOException, InterruptedException { - ZooKeeper zk = new ZooKeeper(hp, 9000, watcher); + TestableZooKeeper zk = new TestableZooKeeper(hp, 9000, watcher); if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { Modified: hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java?rev=718926&r1=718925&r2=718926&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java (original) +++ hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java Wed Nov 19 02:22:19 2008 @@ -27,7 +27,10 @@ import org.apache.zookeeper.ClientCnxn; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.TestableZooKeeper; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; @@ -40,6 +43,14 @@ public class WatcherTest extends ClientBase { protected static final Logger LOG = Logger.getLogger(WatcherTest.class); + private final class MyStatCallback implements StatCallback { + int rc; + public void processResult(int rc, String path, Object ctx, Stat stat) { + ((int[])ctx)[0]++; + this.rc = rc; + } + } + private class MyWatcher extends CountdownWatcher { LinkedBlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<WatchedEvent>(); @@ -118,6 +129,57 @@ } } + final static int COUNT = 100; + boolean hasSeenDelete = true; + /** + * This test checks that watches for pending requests do not get triggered, + * but watches set by previous requests do. + * + * @throws Exception + */ + @Test + public void testWatchAutoResetWithPending() throws Exception { + MyWatcher watches[] = new MyWatcher[COUNT]; + MyStatCallback cbs[] = new MyStatCallback[COUNT]; + MyWatcher watcher = new MyWatcher(); + int count[] = new int[1]; + TestableZooKeeper zk = createClient(watcher, hostPort); + ZooKeeper zk2 = createClient(watcher, hostPort); + zk2.create("/test", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + for(int i = 0; i < COUNT/2; i++) { + watches[i] = new MyWatcher(); + cbs[i] = new MyStatCallback(); + zk.exists("/test", watches[i], cbs[i], count); + } + zk.exists("/test", false); + zk.pauseCnxn(4000); + Thread.sleep(50); + zk2.close(); + stopServer(); + watches[0].waitForDisconnected(3000); + for(int i = COUNT/2; i < COUNT; i++) { + watches[i] = new MyWatcher(); + cbs[i] = new MyStatCallback(); + zk.exists("/test", watches[i], cbs[i], count); + } + startServer(); + watches[49].waitForConnected(4000); + assertEquals(null, zk.exists("/test", false)); + Thread.sleep(10); + for(int i = 0; i < COUNT/2; i++) { + assertEquals("For " + i, 1, watches[i].events.size()); + } + for(int i = COUNT/2; i < COUNT; i++) { + if (cbs[i].rc == 0) { + assertEquals("For " +i, 1, watches[i].events.size()); + } else { + assertEquals("For " +i, 0, watches[i].events.size()); + } + } + assertEquals(COUNT, count[0]); + zk.close(); + } + @Test public void testWatcherAutoResetWithGlobal() throws Exception { ZooKeeper zk = null;