Author: breed
Date: Wed Sep 24 14:05:26 2008
New Revision: 698734

URL: http://svn.apache.org/viewvc?rev=698734&view=rev
Log:
ZOOKEEPER-137 client watcher objects can lose events

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=698734&r1=698733&r2=698734&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Sep 24 14:05:26 2008
@@ -64,3 +64,5 @@
 
  ZOOKEEPER-131. Fix Old leader election can elect a dead leader over and over
  again. (breed via mahadev)
+
+ ZOOKEEPER-137. client watcher objects can lose events (Patrick Hunt via breed)

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=698734&r1=698733&r2=698734&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
Wed Sep 24 14:05:26 2008
@@ -94,9 +94,6 @@
      */
     private LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
 
-    private LinkedBlockingQueue<Object> waitingEvents = 
-        new LinkedBlockingQueue<Object>();
-
     /**
      * These are the packets that need to be sent.
      */
@@ -112,7 +109,7 @@
 
     private final ZooKeeper zooKeeper;
 
-    private final Watcher watcher;
+    private final ClientWatchManager watcher;
 
     private long sessionId;
 
@@ -206,7 +203,7 @@
     }
 
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
-            Watcher watcher)
+            ClientWatchManager watcher)
         throws IOException
     {
         this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]);
@@ -226,7 +223,7 @@
      * @throws IOException
      */
     public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
-            Watcher watcher, long sessionId, byte[] sessionPasswd)
+            ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
         throws IOException
     {
         this.zooKeeper = zooKeeper;
@@ -251,11 +248,11 @@
         readTimeout = sessionTimeout * 2 / 3;
         Collections.shuffle(serverAddrs);
         sendThread = new SendThread();
-        sendThread.start();
         eventThread = new EventThread();
+        sendThread.start();
         eventThread.start();
     }
-
+    
     WatcherEvent eventOfDeath = new WatcherEvent();
 
     final static UncaughtExceptionHandler uncaughtExceptionHandler = new 
UncaughtExceptionHandler() {
@@ -264,12 +261,43 @@
         }
     };
 
+    private class WatcherSetEventPair {
+        private final Set<Watcher> watchers;
+        private final WatcherEvent event;
+        
+        public WatcherSetEventPair(Set<Watcher> watchers, WatcherEvent event) {
+            this.watchers = watchers;
+            this.event = event;
+        }
+    }
+    
     class EventThread extends Thread {
+        private final LinkedBlockingQueue<Object> waitingEvents = 
+            new LinkedBlockingQueue<Object>();
+
         EventThread() {
             super(currentThread().getName() + "-EventThread");
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
             setDaemon(true);
         }
+        
+        public void queueEvent(WatcherEvent event) {
+            // materialize the watchers based on the event
+            WatcherSetEventPair pair = new WatcherSetEventPair(
+                    watcher.materialize(event.getState(), event.getType(),
+                            event.getPath()),
+                            event);
+            // queue the pair (watch set & event) for later processing
+            waitingEvents.add(pair);
+        }
+        
+        public void queuePacket(Packet packet) {
+            waitingEvents.add(packet);
+        }
+
+        public void queueEventOfDeath() {
+            waitingEvents.add(eventOfDeath);
+        }
 
         @Override
         public void run() {
@@ -279,8 +307,12 @@
                     if (event == eventOfDeath) {
                         break;
                     }
-                    if (event instanceof WatcherEvent) {
-                        watcher.process((WatcherEvent) event);
+                    if (event instanceof WatcherSetEventPair) {
+                        // each watcher will process the event
+                        WatcherSetEventPair pair = (WatcherSetEventPair)event;
+                        for (Watcher watcher: pair.watchers) {
+                            watcher.process(pair.event);
+                        }
                     } else {
                         Packet p = (Packet) event;
                         int rc = 0;
@@ -362,7 +394,6 @@
         }
     }
 
-    @SuppressWarnings("unchecked")
     private void finishPacket(Packet p) {
         if (p.watchRegistration != null) {
             p.watchRegistration.register(p.replyHeader.getErr());
@@ -375,7 +406,7 @@
             }
         } else {
             p.finished = true;
-            waitingEvents.add(p);
+            eventThread.queuePacket(p);
         }
     }
 
@@ -428,7 +459,7 @@
             int sessionTimeout = conRsp.getTimeOut();
             if (sessionTimeout <= 0) {
                 zooKeeper.state = States.CLOSED;
-                waitingEvents.add(new WatcherEvent(Watcher.Event.EventNone,
+                eventThread.queueEvent(new 
WatcherEvent(Watcher.Event.EventNone,
                         Watcher.Event.KeeperStateExpired, null));
                 throw new IOException("Session Expired");
             }
@@ -436,11 +467,10 @@
             connectTimeout = sessionTimeout / serverAddrs.size();
             sessionId = conRsp.getSessionId();
             sessionPasswd = conRsp.getPasswd();
-            waitingEvents.add(new WatcherEvent(Watcher.Event.EventNone,
+            eventThread.queueEvent(new WatcherEvent(Watcher.Event.EventNone,
                     Watcher.Event.KeeperStateSyncConnected, null));
         }
 
-        @SuppressWarnings("unchecked")
         void readResponse() throws IOException {
             ByteBufferInputStream bbis = new ByteBufferInputStream(
                     incomingBuffer);
@@ -461,9 +491,13 @@
                 // -1 means notification
                 WatcherEvent event = new WatcherEvent();
                 event.deserialize(bbia, "response");
-                // System.out.println("Got an event: " + event + " for " +
-                // sessionId + " through" + _cnxn);
-                waitingEvents.add(event);
+                
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got an event: " + event + " for sessionid 0x"
+                            + Long.toHexString(sessionId));
+                }
+                
+                eventThread.queueEvent(event);
                 return;
             }
             if (pendingQueue.size() == 0) {
@@ -763,8 +797,10 @@
                                 e);
                         cleanup();
                         if (zooKeeper.state.isAlive()) {
-                            waitingEvents.add(new WatcherEvent(Event.EventNone,
-                                    Event.KeeperStateDisconnected, null));
+                            eventThread.queueEvent(new WatcherEvent(
+                                    Event.EventNone,
+                                    Event.KeeperStateDisconnected,
+                                    null));
                         }
     
                         now = System.currentTimeMillis();
@@ -842,13 +878,12 @@
      * method is primarily here to allow the tests to verify disconnection
      * behavior.
      */
-    @SuppressWarnings("unchecked")
     public void disconnect() {
         LOG.info("Disconnecting ClientCnxn for session: 0x" 
                 + Long.toHexString(getSessionId()));
 
         sendThread.close();
-        waitingEvents.add(eventOfDeath);
+        eventThread.queueEventOfDeath();
     }
 
     /**

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=698734&r1=698733&r2=698734&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java 
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java 
Wed Sep 24 14:05:26 2008
@@ -106,37 +106,35 @@
 public class ZooKeeper {
     private static final Logger LOG = Logger.getLogger(ZooKeeper.class);
 
-    private volatile Watcher defaultWatcher;
-
-    private final Map<String, Set<Watcher>> dataWatches =
-        new HashMap<String, Set<Watcher>>();
-    private final Map<String, Set<Watcher>> childWatches =
-        new HashMap<String, Set<Watcher>>();
-
+    private final ZKWatchManager watchManager = new ZKWatchManager();
+    
     /**
-     * Process watch events generated by the ClientCnxn object.
+     * Manage watchers & handle events generated by the ClientCnxn object.
      * 
      * We are implementing this as a nested class of ZooKeeper so that
-     * the public Watcher.process(event) method will not be exposed as part 
-     * of the ZooKeeper client API.
+     * the public methods will not be exposed as part of the ZooKeeper client
+     * API.
      */
-    private class ZKWatcher implements Watcher {
-        /**
-         * Process a WatchEvent.
-         *
-         * Looks up the watch in the set of watches, processes the event
-         * if found, otw uses the default watcher (registered during instance
-         * creation) to process the watch.
-         *
-         * @param event the event to process.
+    private class ZKWatchManager implements ClientWatchManager {
+        private final Map<String, Set<Watcher>> dataWatches =
+            new HashMap<String, Set<Watcher>>();
+        private final Map<String, Set<Watcher>> childWatches =
+            new HashMap<String, Set<Watcher>>();
+
+        private volatile Watcher defaultWatcher;
+
+        /* (non-Javadoc)
+         * @see org.apache.zookeeper.ClientWatchManager#materialize(int, int, 
java.lang.String)
          */
-        public void process(WatcherEvent event) {
+        public Set<Watcher> materialize(int state, int type, String path) {
+            Set<Watcher> result = new HashSet<Watcher>();
+            
             // clear the watches if we are not connected
-            if (event.getState() != Watcher.Event.KeeperStateSyncConnected) {
+            if (state != Watcher.Event.KeeperStateSyncConnected) {
                 synchronized (dataWatches) {
                     for (Set<Watcher> watchers : dataWatches.values()) {
                         for (Watcher watcher : watchers) {
-                            watcher.process(event);
+                            result.add(watcher);
                         }
                     }
                     dataWatches.clear();
@@ -144,7 +142,7 @@
                 synchronized (childWatches) {
                     for (Set<Watcher> watchers : childWatches.values()) {
                         for (Watcher watcher : watchers) {
-                            watcher.process(event);
+                            result.add(watcher);
                         }
                     }
                     childWatches.clear();
@@ -153,28 +151,28 @@
     
             Set<Watcher> watchers = null;
     
-            switch (event.getType()) {
+            switch (type) {
             case Watcher.Event.EventNone:
-                defaultWatcher.process(event);
-                return;
+                result.add(defaultWatcher);
+                return result;
             case Watcher.Event.EventNodeDataChanged:
             case Watcher.Event.EventNodeCreated:
                 synchronized (dataWatches) {
-                    watchers = dataWatches.remove(event.getPath());
+                    watchers = dataWatches.remove(path);
                 }
                 break;
             case Watcher.Event.EventNodeChildrenChanged:
                 synchronized (childWatches) {
-                    watchers = childWatches.remove(event.getPath());
+                    watchers = childWatches.remove(path);
                 }
                 break;
             case Watcher.Event.EventNodeDeleted:
                 synchronized (dataWatches) {
-                    watchers = dataWatches.remove(event.getPath());
+                    watchers = dataWatches.remove(path);
                 }
                 Set<Watcher> cwatches;
                 synchronized (childWatches) {
-                    cwatches = childWatches.remove(event.getPath());
+                    cwatches = childWatches.remove(path);
                 }
                 if (cwatches != null) {
                     if (watchers == null) {
@@ -185,16 +183,14 @@
                 }
                 break;
             default:
-                String msg = "Unhandled watch event type " + event.getType();
+                String msg = "Unhandled watch event type " + type
+                    + " with state " + state + " on path " + path;
                 LOG.error(msg);
                 throw new RuntimeException(msg);
             }
     
-            if (watchers != null) {
-                for (Watcher watcher : watchers) {
-                    watcher.process(event);
-                }
-            }
+            result.addAll(watchers);
+            return result;
         }
     }
 
@@ -270,14 +266,14 @@
 
     public ZooKeeper(String host, int sessionTimeout, Watcher watcher)
             throws IOException {
-        this.defaultWatcher = watcher;
-        cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher());
+        watchManager.defaultWatcher = watcher;
+        cnxn = new ClientCnxn(host, sessionTimeout, this, watchManager);
     }
 
     public ZooKeeper(String host, int sessionTimeout, Watcher watcher,
             long sessionId, byte[] sessionPasswd) throws IOException {
-        this.defaultWatcher = watcher;
-        cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher(),
+        watchManager.defaultWatcher = watcher;
+        cnxn = new ClientCnxn(host, sessionTimeout, this, watchManager,
                 sessionId, sessionPasswd);
     }
 
@@ -301,7 +297,7 @@
     }
 
     public synchronized void register(Watcher watcher) {
-        this.defaultWatcher = watcher;
+        watchManager.defaultWatcher = watcher;
     }
 
     /**
@@ -503,7 +499,8 @@
         SetDataResponse response = new SetDataResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new ExistsWatchRegistration(dataWatches, watcher, path);
+            wcb = new ExistsWatchRegistration(watchManager.dataWatches, 
watcher,
+                    path);
         }
         ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
         if (r.getErr() != 0) {
@@ -537,7 +534,7 @@
     public Stat exists(String path, boolean watch) throws KeeperException,
         InterruptedException
     {
-        return exists(path, watch ? defaultWatcher : null);
+        return exists(path, watch ? watchManager.defaultWatcher : null);
     }
 
     /**
@@ -557,7 +554,8 @@
         SetDataResponse response = new SetDataResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new ExistsWatchRegistration(dataWatches, watcher, path);
+            wcb = new ExistsWatchRegistration(watchManager.dataWatches, 
watcher,
+                    path);
         }
         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
                         ctx, wcb);
@@ -570,7 +568,7 @@
      * @see #exists(String, boolean)
      */
     public void exists(String path, boolean watch, StatCallback cb, Object 
ctx) {
-        exists(path, watch ? defaultWatcher : null, cb, ctx);
+        exists(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
     }
 
     /**
@@ -601,7 +599,8 @@
         GetDataResponse response = new GetDataResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new WatchRegistration(dataWatches, watcher, path);
+            wcb = new WatchRegistration(watchManager.dataWatches, watcher,
+                    path);
         }
         ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
         if (r.getErr() != 0) {
@@ -633,7 +632,7 @@
      */
     public byte[] getData(String path, boolean watch, Stat stat)
             throws KeeperException, InterruptedException {
-        return getData(path, watch ? defaultWatcher : null, stat);
+        return getData(path, watch ? watchManager.defaultWatcher : null, stat);
     }
 
     /**
@@ -651,7 +650,8 @@
         GetDataResponse response = new GetDataResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new WatchRegistration(dataWatches, watcher, path);
+            wcb = new WatchRegistration(watchManager.dataWatches, watcher,
+                    path);
         }
         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
                         ctx, wcb);
@@ -664,7 +664,7 @@
      * @see #getData(String, boolean, Stat)
      */
     public void getData(String path, boolean watch, DataCallback cb, Object 
ctx) {
-        getData(path, watch ? defaultWatcher : null, cb, ctx);
+        getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
     }
 
     /**
@@ -862,7 +862,8 @@
         GetChildrenResponse response = new GetChildrenResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new WatchRegistration(childWatches, watcher, path);
+            wcb = new WatchRegistration(watchManager.childWatches, watcher,
+                    path);
         }
         ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
         if (r.getErr() != 0) {
@@ -893,7 +894,7 @@
      */
     public List<String> getChildren(String path, boolean watch)
             throws KeeperException, InterruptedException {
-        return getChildren(path, watch ? defaultWatcher : null);
+        return getChildren(path, watch ? watchManager.defaultWatcher : null);
     }
 
     /**
@@ -912,7 +913,8 @@
         GetChildrenResponse response = new GetChildrenResponse();
         WatchRegistration wcb = null;
         if (watcher != null) {
-            wcb = new WatchRegistration(childWatches, watcher, path);
+            wcb = new WatchRegistration(watchManager.childWatches, watcher,
+                    path);
         }
         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
                         ctx, wcb);
@@ -926,7 +928,7 @@
      */
     public void getChildren(String path, boolean watch, ChildrenCallback cb,
             Object ctx) {
-        getChildren(path, watch ? defaultWatcher : null, cb, ctx);
+        getChildren(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
     }
 
     /**


Reply via email to