Author: mahadev
Date: Wed Nov 19 02:22:19 2008
New Revision: 718926

URL: http://svn.apache.org/viewvc?rev=718926&view=rev
Log:
ZOOKEEPER-204. SetWatches needs to be the first message after auth messages to 
the server (ben via mahadev)

Added:
    
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
Modified:
    hadoop/zookeeper/branches/branch-3.0/CHANGES.txt
    
hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java
    
hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java
    
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java
    
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java

Modified: hadoop/zookeeper/branches/branch-3.0/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/CHANGES.txt?rev=718926&r1=718925&r2=718926&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.0/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.0/CHANGES.txt Wed Nov 19 02:22:19 2008
@@ -30,6 +30,9 @@
    ZOOKEEPER-226. fix exists calls that fail on server if node has null data.
    (mahadev) 
 
+   ZOOKEEPER-204. SetWatches needs to be the first message after auth messages
+to the server (ben via mahadev)
+
 Release 3.0.0 - 2008-10-21
 
 Non-backward compatible changes:

Modified: 
hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=718926&r1=718925&r2=718926&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ClientCnxn.java
 Wed Nov 19 02:22:19 2008
@@ -336,87 +336,96 @@
             try {
                 while (true) {
                     Object event = waitingEvents.take();
-                    if (event == eventOfDeath) {
-                        break;
-                    }
-
-                    if (event instanceof WatcherSetEventPair) {
-                        // each watcher will process the event
-                        WatcherSetEventPair pair = (WatcherSetEventPair)event;
-                        for (Watcher watcher: pair.watchers) {
-                            watcher.process(pair.event);
-                        }
-                    } else {
-                        Packet p = (Packet) event;
-                        int rc = 0;
-                        String path = p.path;
-                        if (p.replyHeader.getErr() != 0) {
-                            rc = p.replyHeader.getErr();
+                    try {
+                        if (event == eventOfDeath) {
+                            break;
                         }
-                        if (p.cb == null) {
-                            LOG.warn("Somehow a null cb got to EventThread!");
-                        } else if (p.response instanceof ExistsResponse
-                                || p.response instanceof SetDataResponse
-                                || p.response instanceof SetACLResponse) {
-                            StatCallback cb = (StatCallback) p.cb;
-                            if (rc == 0) {
-                                if (p.response instanceof ExistsResponse) {
-                                    cb.processResult(rc, path, p.ctx,
-                                            ((ExistsResponse) p.response)
-                                                    .getStat());
-                                } else if (p.response instanceof 
SetDataResponse) {
-                                    cb.processResult(rc, path, p.ctx,
-                                            ((SetDataResponse) p.response)
-                                                    .getStat());
-                                } else if (p.response instanceof 
SetACLResponse) {
-                                    cb.processResult(rc, path, p.ctx,
-                                            ((SetACLResponse) p.response)
-                                                    .getStat());
+
+                        if (event instanceof WatcherSetEventPair) {
+                            // each watcher will process the event
+                            WatcherSetEventPair pair = (WatcherSetEventPair) 
event;
+                            for (Watcher watcher : pair.watchers) {
+                                try {
+                                    watcher.process(pair.event);
+                                } catch (Throwable t) {
+                                    LOG.error("Error while calling watcher ", 
t);
                                 }
-                            } else {
-                                cb.processResult(rc, path, p.ctx, null);
-                            }
-                        } else if (p.response instanceof GetDataResponse) {
-                            DataCallback cb = (DataCallback) p.cb;
-                            GetDataResponse rsp = (GetDataResponse) p.response;
-                            if (rc == 0) {
-                                cb.processResult(rc, path, p.ctx,
-                                        rsp.getData(), rsp.getStat());
-                            } else {
-                                cb.processResult(rc, path, p.ctx, null, null);
-                            }
-                        } else if (p.response instanceof GetACLResponse) {
-                            ACLCallback cb = (ACLCallback) p.cb;
-                            GetACLResponse rsp = (GetACLResponse) p.response;
-                            if (rc == 0) {
-                                cb.processResult(rc, path, p.ctx, rsp.getAcl(),
-                                        rsp.getStat());
-                            } else {
-                                cb.processResult(rc, path, p.ctx, null, null);
                             }
-                        } else if (p.response instanceof GetChildrenResponse) {
-                            ChildrenCallback cb = (ChildrenCallback) p.cb;
-                            GetChildrenResponse rsp = (GetChildrenResponse) 
p.response;
-                            if (rc == 0) {
-                                cb.processResult(rc, path, p.ctx, rsp
-                                        .getChildren());
-                            } else {
-                                cb.processResult(rc, path, p.ctx, null);
+                        } else {
+                            Packet p = (Packet) event;
+                            int rc = 0;
+                            String path = p.path;
+                            if (p.replyHeader.getErr() != 0) {
+                                rc = p.replyHeader.getErr();
                             }
-                        } else if (p.response instanceof CreateResponse) {
-                            StringCallback cb = (StringCallback) p.cb;
-                            CreateResponse rsp = (CreateResponse) p.response;
-                            if (rc == 0) {
-                                cb
-                                        .processResult(rc, path, p.ctx, rsp
-                                                .getPath());
-                            } else {
-                                cb.processResult(rc, path, p.ctx, null);
+                            if (p.cb == null) {
+                                LOG.warn("Somehow a null cb got to 
EventThread!");
+                            } else if (p.response instanceof ExistsResponse
+                                    || p.response instanceof SetDataResponse
+                                    || p.response instanceof SetACLResponse) {
+                                StatCallback cb = (StatCallback) p.cb;
+                                if (rc == 0) {
+                                    if (p.response instanceof ExistsResponse) {
+                                        cb.processResult(rc, path, p.ctx,
+                                                ((ExistsResponse) p.response)
+                                                        .getStat());
+                                    } else if (p.response instanceof 
SetDataResponse) {
+                                        cb.processResult(rc, path, p.ctx,
+                                                ((SetDataResponse) p.response)
+                                                        .getStat());
+                                    } else if (p.response instanceof 
SetACLResponse) {
+                                        cb.processResult(rc, path, p.ctx,
+                                                ((SetACLResponse) p.response)
+                                                        .getStat());
+                                    }
+                                } else {
+                                    cb.processResult(rc, path, p.ctx, null);
+                                }
+                            } else if (p.response instanceof GetDataResponse) {
+                                DataCallback cb = (DataCallback) p.cb;
+                                GetDataResponse rsp = (GetDataResponse) 
p.response;
+                                if (rc == 0) {
+                                    cb.processResult(rc, path, p.ctx, rsp
+                                            .getData(), rsp.getStat());
+                                } else {
+                                    cb.processResult(rc, path, p.ctx, null,
+                                            null);
+                                }
+                            } else if (p.response instanceof GetACLResponse) {
+                                ACLCallback cb = (ACLCallback) p.cb;
+                                GetACLResponse rsp = (GetACLResponse) 
p.response;
+                                if (rc == 0) {
+                                    cb.processResult(rc, path, p.ctx, rsp
+                                            .getAcl(), rsp.getStat());
+                                } else {
+                                    cb.processResult(rc, path, p.ctx, null,
+                                            null);
+                                }
+                            } else if (p.response instanceof 
GetChildrenResponse) {
+                                ChildrenCallback cb = (ChildrenCallback) p.cb;
+                                GetChildrenResponse rsp = 
(GetChildrenResponse) p.response;
+                                if (rc == 0) {
+                                    cb.processResult(rc, path, p.ctx, rsp
+                                            .getChildren());
+                                } else {
+                                    cb.processResult(rc, path, p.ctx, null);
+                                }
+                            } else if (p.response instanceof CreateResponse) {
+                                StringCallback cb = (StringCallback) p.cb;
+                                CreateResponse rsp = (CreateResponse) 
p.response;
+                                if (rc == 0) {
+                                    cb.processResult(rc, path, p.ctx, rsp
+                                            .getPath());
+                                } else {
+                                    cb.processResult(rc, path, p.ctx, null);
+                                }
+                            } else if (p.cb instanceof VoidCallback) {
+                                VoidCallback cb = (VoidCallback) p.cb;
+                                cb.processResult(rc, path, p.ctx);
                             }
-                        } else if (p.cb instanceof VoidCallback) {
-                            VoidCallback cb = (VoidCallback) p.cb;
-                            cb.processResult(rc, path, p.ctx);
                         }
+                    } catch (Throwable t) {
+                        LOG.error("Caught unexpected throwable", t);
                     }
                 }
             } catch (InterruptedException e) {
@@ -504,15 +513,6 @@
             sessionPasswd = conRsp.getPasswd();
             eventThread.queueEvent(new 
WatchedEvent(Watcher.Event.EventType.None,
                     Watcher.Event.KeeperState.SyncConnected, null));
-            if (!disableAutoWatchReset) {
-                SetWatches sw = new SetWatches(lastZxid,
-                        zooKeeper.getDataWatches(),
-                        zooKeeper.getExistWatches(),
-                        zooKeeper.getChildWatches());
-                RequestHeader h = new RequestHeader();
-                h.setType(ZooDefs.OpCode.setWatches);
-                queuePacket(h, new ReplyHeader(), sw, null, null, null, null, 
null);
-            }
         }
 
         void readResponse() throws IOException {
@@ -702,6 +702,20 @@
             bb.putInt(bb.capacity() - 4);
             bb.rewind();
             synchronized (outgoingQueue) {
+                // We add backwards since we are pushing into the front
+                if (!disableAutoWatchReset) {
+                    SetWatches sw = new SetWatches(lastZxid,
+                            zooKeeper.getDataWatches(),
+                            zooKeeper.getExistWatches(),
+                            zooKeeper.getChildWatches());
+                    RequestHeader h = new RequestHeader();
+                    h.setType(ZooDefs.OpCode.setWatches);
+                    h.setXid(-8);
+                    Packet packet = new Packet(h, new ReplyHeader(), sw, null, 
null,
+                                null);
+                    outgoingQueue.addFirst(packet);
+                }
+
                 for (AuthData id : authInfo) {
                     outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                             OpCode.auth), null, new AuthPacket(0, id.scheme,

Modified: 
hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=718926&r1=718925&r2=718926&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.0/src/java/main/org/apache/zookeeper/ZooKeeper.java
 Wed Nov 19 02:22:19 2008
@@ -192,8 +192,11 @@
                 }
                 // XXX This shouldn't be needed, but just in case
                 synchronized (existWatches) {
-                    addTo(existWatches.remove(path), result);
-                    LOG.warn("We are triggering an exists watch for delete! 
Shouldn't happen!");
+                    Set<Watcher> list = existWatches.remove(path);
+                    if (list != null) {
+                        addTo(existWatches.remove(path), result);
+                        LOG.warn("We are triggering an exists watch for 
delete! Shouldn't happen!");
+                    }
                 }
                 synchronized (childWatches) {
                     addTo(childWatches.remove(path), result);

Added: 
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java?rev=718926&view=auto
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
 (added)
+++ 
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
 Wed Nov 19 02:22:19 2008
@@ -0,0 +1,54 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+
+public class TestableZooKeeper extends ZooKeeper {
+
+    public TestableZooKeeper(String host, int sessionTimeout,
+            Watcher watcher) throws IOException {
+        super(host, sessionTimeout, watcher);
+    }
+    
+    @Override
+    public List<String> getChildWatches() {
+        return super.getChildWatches();
+    }
+
+
+    @Override
+    public List<String> getDataWatches() {
+        return super.getDataWatches();
+    }
+
+
+    @Override
+    public List<String> getExistWatches() {
+        return super.getExistWatches();
+    }
+
+
+    /**
+     * Cause this ZooKeeper object to stop receiving from the ZooKeeperServer
+     * for the given number of milliseconds.
+     * @param ms the number of milliseconds to pause.
+     */
+    public void pauseCnxn(final long ms) {
+        new Thread() {
+            public void run() {
+                synchronized(cnxn) {
+                    try {
+                        try {
+                            
((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        Thread.sleep(ms);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+        }.start();
+    }
+}

Modified: 
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=718926&r1=718925&r2=718926&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/ClientBase.java
 Wed Nov 19 02:22:19 2008
@@ -33,6 +33,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -126,10 +127,10 @@
         return createClient(watcher, hp);
     }
 
-    protected ZooKeeper createClient(CountdownWatcher watcher, String hp)
+    protected TestableZooKeeper createClient(CountdownWatcher watcher, String 
hp)
         throws IOException, InterruptedException
     {
-        ZooKeeper zk = new ZooKeeper(hp, 9000, watcher);
+        TestableZooKeeper zk = new TestableZooKeeper(hp, 9000, watcher);
         if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
                 TimeUnit.MILLISECONDS))
         {

Modified: 
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java?rev=718926&r1=718925&r2=718926&view=diff
==============================================================================
--- 
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java
 (original)
+++ 
hadoop/zookeeper/branches/branch-3.0/src/java/test/org/apache/zookeeper/test/WatcherTest.java
 Wed Nov 19 02:22:19 2008
@@ -27,7 +27,10 @@
 import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
@@ -40,6 +43,14 @@
 public class WatcherTest extends ClientBase {
     protected static final Logger LOG = Logger.getLogger(WatcherTest.class);
 
+    private final class MyStatCallback implements StatCallback {
+        int rc;
+        public void processResult(int rc, String path, Object ctx, Stat stat) {
+            ((int[])ctx)[0]++;
+            this.rc = rc;
+        }
+    }
+
     private class MyWatcher extends CountdownWatcher {
         LinkedBlockingQueue<WatchedEvent> events =
             new LinkedBlockingQueue<WatchedEvent>();
@@ -118,6 +129,57 @@
         }
     }
 
+    final static int COUNT = 100;
+    boolean hasSeenDelete = true;
+    /**
+     * This test checks that watches for pending requests do not get triggered,
+     * but watches set by previous requests do.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testWatchAutoResetWithPending() throws Exception {
+       MyWatcher watches[] = new MyWatcher[COUNT];
+       MyStatCallback cbs[] = new MyStatCallback[COUNT];
+       MyWatcher watcher = new MyWatcher();
+       int count[] = new int[1];
+       TestableZooKeeper zk = createClient(watcher, hostPort);
+       ZooKeeper zk2 = createClient(watcher, hostPort);
+       zk2.create("/test", new byte[0], Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL);
+       for(int i = 0; i < COUNT/2; i++) {
+           watches[i] = new MyWatcher();
+           cbs[i] = new MyStatCallback();
+           zk.exists("/test", watches[i], cbs[i], count);
+       }
+       zk.exists("/test", false);
+       zk.pauseCnxn(4000);
+       Thread.sleep(50);
+       zk2.close();
+       stopServer();
+       watches[0].waitForDisconnected(3000);
+       for(int i = COUNT/2; i < COUNT; i++) {
+           watches[i] = new MyWatcher();
+           cbs[i] = new MyStatCallback();
+           zk.exists("/test", watches[i], cbs[i], count);
+       }
+       startServer();
+       watches[49].waitForConnected(4000);
+       assertEquals(null, zk.exists("/test", false));
+       Thread.sleep(10);
+       for(int i = 0; i < COUNT/2; i++) {
+           assertEquals("For " + i, 1, watches[i].events.size());
+       }
+       for(int i = COUNT/2; i < COUNT; i++) {
+           if (cbs[i].rc == 0) {
+               assertEquals("For " +i, 1, watches[i].events.size());
+           } else {
+               assertEquals("For " +i, 0, watches[i].events.size());
+           }
+       }
+       assertEquals(COUNT, count[0]);
+       zk.close();
+    }
+    
     @Test
     public void testWatcherAutoResetWithGlobal() throws Exception {
         ZooKeeper zk = null;


Reply via email to