Author: phunt
Date: Thu Oct 21 00:44:05 2010
New Revision: 1025801

URL: http://svn.apache.org/viewvc?rev=1025801&view=rev
Log:
ZOOKEEPER-794. Callbacks are not invoked when the client is closed

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
    
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=1025801&r1=1025800&r2=1025801&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Oct 21 00:44:05 2010
@@ -131,6 +131,9 @@ BUGFIXES: 
   ZOOKEEPER-820. update c unit tests to ensure "zombie" java server
   processes don't cause failure (Michi Mutsuzaki via phunt)
 
+  ZOOKEEPER-794. Callbacks are not invoked when the client is closed
+  (Alexis Midon via phunt)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: 
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1025801&r1=1025800&r2=1025801&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java 
Thu Oct 21 00:44:05 2010
@@ -451,6 +451,9 @@ public class ClientCnxn {
          */
         private volatile KeeperState sessionState = KeeperState.Disconnected;
 
+       private volatile boolean wasKilled = false;
+       private volatile boolean isRunning = false;
+
         EventThread() {
             super(makeThreadName("-EventThread"));
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
@@ -473,9 +476,16 @@ public class ClientCnxn {
             waitingEvents.add(pair);
         }
 
-        public void queuePacket(Packet packet) {
-            waitingEvents.add(packet);
-        }
+       public void queuePacket(Packet packet) {
+          if (wasKilled) {
+             synchronized (waitingEvents) {
+                if (isRunning) waitingEvents.add(packet);
+                else processEvent(packet);
+             }
+          } else {
+             waitingEvents.add(packet);
+          }
+       }
 
         public void queueEventOfDeath() {
             waitingEvents.add(eventOfDeath);
@@ -483,119 +493,131 @@ public class ClientCnxn {
 
         @Override
         public void run() {
-            try {
-                while (true) {
-                    Object event = waitingEvents.take();
-                    try {
-                        if (event == eventOfDeath) {
-                            return;
-                        }
-
-                        if (event instanceof WatcherSetEventPair) {
-                            // each watcher will process the event
-                            WatcherSetEventPair pair = (WatcherSetEventPair) 
event;
-                            for (Watcher watcher : pair.watchers) {
-                                try {
-                                    watcher.process(pair.event);
-                                } catch (Throwable t) {
-                                    LOG.error("Error while calling watcher ", 
t);
-                                }
-                            }
-                        } else {
-                            Packet p = (Packet) event;
-                            int rc = 0;
-                            String clientPath = p.clientPath;
-                            if (p.replyHeader.getErr() != 0) {
-                                rc = p.replyHeader.getErr();
-                            }
-                            if (p.cb == null) {
-                                LOG.warn("Somehow a null cb got to 
EventThread!");
-                            } else if (p.response instanceof ExistsResponse
-                                    || p.response instanceof SetDataResponse
-                                    || p.response instanceof SetACLResponse) {
-                                StatCallback cb = (StatCallback) p.cb;
-                                if (rc == 0) {
-                                    if (p.response instanceof ExistsResponse) {
-                                        cb.processResult(rc, clientPath, p.ctx,
-                                                ((ExistsResponse) p.response)
-                                                        .getStat());
-                                    } else if (p.response instanceof 
SetDataResponse) {
-                                        cb.processResult(rc, clientPath, p.ctx,
-                                                ((SetDataResponse) p.response)
-                                                        .getStat());
-                                    } else if (p.response instanceof 
SetACLResponse) {
-                                        cb.processResult(rc, clientPath, p.ctx,
-                                                ((SetACLResponse) p.response)
-                                                        .getStat());
-                                    }
-                                } else {
-                                    cb.processResult(rc, clientPath, p.ctx, 
null);
-                                }
-                            } else if (p.response instanceof GetDataResponse) {
-                                DataCallback cb = (DataCallback) p.cb;
-                                GetDataResponse rsp = (GetDataResponse) 
p.response;
-                                if (rc == 0) {
-                                    cb.processResult(rc, clientPath, p.ctx, rsp
-                                            .getData(), rsp.getStat());
-                                } else {
-                                    cb.processResult(rc, clientPath, p.ctx, 
null,
-                                            null);
-                                }
-                            } else if (p.response instanceof GetACLResponse) {
-                                ACLCallback cb = (ACLCallback) p.cb;
-                                GetACLResponse rsp = (GetACLResponse) 
p.response;
-                                if (rc == 0) {
-                                    cb.processResult(rc, clientPath, p.ctx, rsp
-                                            .getAcl(), rsp.getStat());
-                                } else {
-                                    cb.processResult(rc, clientPath, p.ctx, 
null,
-                                            null);
-                                }
-                            } else if (p.response instanceof 
GetChildrenResponse) {
-                                ChildrenCallback cb = (ChildrenCallback) p.cb;
-                                GetChildrenResponse rsp = 
(GetChildrenResponse) p.response;
-                                if (rc == 0) {
-                                    cb.processResult(rc, clientPath, p.ctx, rsp
-                                            .getChildren());
-                                } else {
-                                    cb.processResult(rc, clientPath, p.ctx, 
null);
-                                }
-                            } else if (p.response instanceof 
GetChildren2Response) {
-                                Children2Callback cb = (Children2Callback) 
p.cb;
-                                GetChildren2Response rsp = 
(GetChildren2Response) p.response;
-                                if (rc == 0) {
-                                    cb.processResult(rc, clientPath, p.ctx, rsp
-                                            .getChildren(), rsp.getStat());
-                                } else {
-                                    cb.processResult(rc, clientPath, p.ctx, 
null, null);
-                                }
-                            } else if (p.response instanceof CreateResponse) {
-                                StringCallback cb = (StringCallback) p.cb;
-                                CreateResponse rsp = (CreateResponse) 
p.response;
-                                if (rc == 0) {
-                                    cb.processResult(rc, clientPath, p.ctx,
-                                            (chrootPath == null
-                                                    ? rsp.getPath()
-                                                    : rsp.getPath()
-                                              
.substring(chrootPath.length())));
-                                } else {
-                                    cb.processResult(rc, clientPath, p.ctx, 
null);
-                                }
-                            } else if (p.cb instanceof VoidCallback) {
-                                VoidCallback cb = (VoidCallback) p.cb;
-                                cb.processResult(rc, clientPath, p.ctx);
-                            }
-                        }
-                    } catch (Throwable t) {
-                        LOG.error("Caught unexpected throwable", t);
-                    }
-                }
-            } catch (InterruptedException e) {
-                LOG.error("Event thread exiting due to interruption", e);
-            }
+           try {
+              isRunning = true;
+              while (true) {
+                 Object event = waitingEvents.take();
+                 if (event == eventOfDeath) {
+                    wasKilled = true;
+                 } else {
+                    processEvent(event);
+                 }
+                 if (wasKilled)
+                    synchronized (waitingEvents) {
+                       if (waitingEvents.isEmpty()) {
+                          isRunning = false;
+                          break;
+                       }
+                    }
+              }
+           } catch (InterruptedException e) {
+              LOG.error("Event thread exiting due to interruption", e);
+           }
 
             LOG.info("EventThread shut down");
         }
+
+       private void processEvent(Object event) {
+          try {
+              if (event instanceof WatcherSetEventPair) {
+                  // each watcher will process the event
+                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
+                  for (Watcher watcher : pair.watchers) {
+                      try {
+                          watcher.process(pair.event);
+                      } catch (Throwable t) {
+                          LOG.error("Error while calling watcher ", t);
+                      }
+                  }
+              } else {
+                  Packet p = (Packet) event;
+                  int rc = 0;
+                  String clientPath = p.clientPath;
+                  if (p.replyHeader.getErr() != 0) {
+                      rc = p.replyHeader.getErr();
+                  }
+                  if (p.cb == null) {
+                      LOG.warn("Somehow a null cb got to EventThread!");
+                  } else if (p.response instanceof ExistsResponse
+                          || p.response instanceof SetDataResponse
+                          || p.response instanceof SetACLResponse) {
+                      StatCallback cb = (StatCallback) p.cb;
+                      if (rc == 0) {
+                          if (p.response instanceof ExistsResponse) {
+                              cb.processResult(rc, clientPath, p.ctx,
+                                      ((ExistsResponse) p.response)
+                                              .getStat());
+                          } else if (p.response instanceof SetDataResponse) {
+                              cb.processResult(rc, clientPath, p.ctx,
+                                      ((SetDataResponse) p.response)
+                                              .getStat());
+                          } else if (p.response instanceof SetACLResponse) {
+                              cb.processResult(rc, clientPath, p.ctx,
+                                      ((SetACLResponse) p.response)
+                                              .getStat());
+                          }
+                      } else {
+                          cb.processResult(rc, clientPath, p.ctx, null);
+                      }
+                  } else if (p.response instanceof GetDataResponse) {
+                      DataCallback cb = (DataCallback) p.cb;
+                      GetDataResponse rsp = (GetDataResponse) p.response;
+                      if (rc == 0) {
+                          cb.processResult(rc, clientPath, p.ctx, rsp
+                                  .getData(), rsp.getStat());
+                      } else {
+                          cb.processResult(rc, clientPath, p.ctx, null,
+                                  null);
+                      }
+                  } else if (p.response instanceof GetACLResponse) {
+                      ACLCallback cb = (ACLCallback) p.cb;
+                      GetACLResponse rsp = (GetACLResponse) p.response;
+                      if (rc == 0) {
+                          cb.processResult(rc, clientPath, p.ctx, rsp
+                                  .getAcl(), rsp.getStat());
+                      } else {
+                          cb.processResult(rc, clientPath, p.ctx, null,
+                                  null);
+                      }
+                  } else if (p.response instanceof GetChildrenResponse) {
+                      ChildrenCallback cb = (ChildrenCallback) p.cb;
+                      GetChildrenResponse rsp = (GetChildrenResponse) 
p.response;
+                      if (rc == 0) {
+                          cb.processResult(rc, clientPath, p.ctx, rsp
+                                  .getChildren());
+                      } else {
+                          cb.processResult(rc, clientPath, p.ctx, null);
+                      }
+                  } else if (p.response instanceof GetChildren2Response) {
+                      Children2Callback cb = (Children2Callback) p.cb;
+                      GetChildren2Response rsp = (GetChildren2Response) 
p.response;
+                      if (rc == 0) {
+                          cb.processResult(rc, clientPath, p.ctx, rsp
+                                  .getChildren(), rsp.getStat());
+                      } else {
+                          cb.processResult(rc, clientPath, p.ctx, null, null);
+                      }
+                  } else if (p.response instanceof CreateResponse) {
+                      StringCallback cb = (StringCallback) p.cb;
+                      CreateResponse rsp = (CreateResponse) p.response;
+                      if (rc == 0) {
+                          cb.processResult(rc, clientPath, p.ctx,
+                                  (chrootPath == null
+                                          ? rsp.getPath()
+                                          : rsp.getPath()
+                                    .substring(chrootPath.length())));
+                      } else {
+                          cb.processResult(rc, clientPath, p.ctx, null);
+                      }
+                  } else if (p.cb instanceof VoidCallback) {
+                      VoidCallback cb = (VoidCallback) p.cb;
+                      cb.processResult(rc, clientPath, p.ctx);
+                  }
+              }
+          } catch (Throwable t) {
+              LOG.error("Caught unexpected throwable", t);
+          }
+       }
     }
 
     private void finishPacket(Packet p) {
@@ -1243,9 +1265,9 @@ public class ClientCnxn {
         }
 
         public void close() {
-            zooKeeper.state = States.CLOSED;
             synchronized (this) {
-                selector.wakeup();
+               zooKeeper.state = States.CLOSED;
+               selector.wakeup();
             }
         }
     }

Modified: 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
URL: 
http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=1025801&r1=1025800&r2=1025801&view=diff
==============================================================================
--- 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java 
(original)
+++ 
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java 
Thu Oct 21 00:44:05 2010
@@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
@@ -211,6 +212,25 @@ public class SessionTest extends ZKTestC
         LOG.info("before close zk with session id 0x"
                 + Long.toHexString(zk.getSessionId()) + "!");
         zk.close();
+        try {
+            zk.getData("/e", false, stat);
+            Assert.fail("Should have received a SessionExpiredException");
+        } catch(KeeperException.SessionExpiredException e) {}
+        
+        AsyncCallback.DataCallback cb = new AsyncCallback.DataCallback() {
+            String status = "not done";
+            public void processResult(int rc, String p, Object c, byte[] b, 
Stat s) {
+                synchronized(this) { status = 
KeeperException.Code.get(rc).toString(); this.notify(); }
+            }
+           public String toString() { return status; }
+        };
+        zk.getData("/e", false, cb, null);
+        synchronized(cb) {
+            if (cb.toString().equals("not done")) {
+                cb.wait(1000);
+            }
+        }
+        Assert.assertEquals(KeeperException.Code.SESSIONEXPIRED.toString(), 
cb.toString());        
     }
 
     private List<Thread> findThreads(String name) {


Reply via email to