Author: phunt Date: Thu Oct 21 00:45:45 2010 New Revision: 1025802 URL: http://svn.apache.org/viewvc?rev=1025802&view=rev Log: ZOOKEEPER-794. Callbacks are not invoked when the client is closed
Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1025802&r1=1025801&r2=1025802&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original) +++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Thu Oct 21 00:45:45 2010 @@ -50,6 +50,9 @@ BUGFIXES: ZOOKEEPER-820. update c unit tests to ensure "zombie" java server processes don't cause failure (Michi Mutsuzaki via phunt) + ZOOKEEPER-794. Callbacks are not invoked when the client is closed + (Alexis Midon via phunt) + IMPROVEMENTS: ZOOKEEPER-789. Improve FLE log messages (flavio via phunt) Modified: hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1025802&r1=1025801&r2=1025802&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/ClientCnxn.java Thu Oct 21 00:45:45 2010 @@ -454,6 +454,9 @@ public class ClientCnxn { */ private volatile KeeperState sessionState = KeeperState.Disconnected; + private volatile boolean wasKilled = false; + private volatile boolean isRunning = false; + EventThread() { super(makeThreadName("-EventThread")); setUncaughtExceptionHandler(uncaughtExceptionHandler); @@ -476,9 +479,16 @@ public class ClientCnxn { waitingEvents.add(pair); } - public void queuePacket(Packet packet) { - waitingEvents.add(packet); - } + public void queuePacket(Packet packet) { + if (wasKilled) { + synchronized (waitingEvents) { + if (isRunning) waitingEvents.add(packet); + else processEvent(packet); + } + } else { + waitingEvents.add(packet); + } + } public void queueEventOfDeath() { waitingEvents.add(eventOfDeath); @@ -486,119 +496,131 @@ public class ClientCnxn { @Override public void run() { - try { - while (true) { - Object event = waitingEvents.take(); - try { - if (event == eventOfDeath) { - return; - } - - if (event instanceof WatcherSetEventPair) { - // each watcher will process the event - WatcherSetEventPair pair = (WatcherSetEventPair) event; - for (Watcher watcher : pair.watchers) { - try { - watcher.process(pair.event); - } catch (Throwable t) { - LOG.error("Error while calling watcher ", t); - } - } - } else { - Packet p = (Packet) event; - int rc = 0; - String clientPath = p.clientPath; - if (p.replyHeader.getErr() != 0) { - rc = p.replyHeader.getErr(); - } - if (p.cb == null) { - LOG.warn("Somehow a null cb got to EventThread!"); - } else if (p.response instanceof ExistsResponse - || p.response instanceof SetDataResponse - || p.response instanceof SetACLResponse) { - StatCallback cb = (StatCallback) p.cb; - if (rc == 0) { - if (p.response instanceof ExistsResponse) { - cb.processResult(rc, clientPath, p.ctx, - ((ExistsResponse) p.response) - .getStat()); - } else if (p.response instanceof SetDataResponse) { - cb.processResult(rc, clientPath, p.ctx, - ((SetDataResponse) p.response) - .getStat()); - } else if (p.response instanceof SetACLResponse) { - cb.processResult(rc, clientPath, p.ctx, - ((SetACLResponse) p.response) - .getStat()); - } - } else { - cb.processResult(rc, clientPath, p.ctx, null); - } - } else if (p.response instanceof GetDataResponse) { - DataCallback cb = (DataCallback) p.cb; - GetDataResponse rsp = (GetDataResponse) p.response; - if (rc == 0) { - cb.processResult(rc, clientPath, p.ctx, rsp - .getData(), rsp.getStat()); - } else { - cb.processResult(rc, clientPath, p.ctx, null, - null); - } - } else if (p.response instanceof GetACLResponse) { - ACLCallback cb = (ACLCallback) p.cb; - GetACLResponse rsp = (GetACLResponse) p.response; - if (rc == 0) { - cb.processResult(rc, clientPath, p.ctx, rsp - .getAcl(), rsp.getStat()); - } else { - cb.processResult(rc, clientPath, p.ctx, null, - null); - } - } else if (p.response instanceof GetChildrenResponse) { - ChildrenCallback cb = (ChildrenCallback) p.cb; - GetChildrenResponse rsp = (GetChildrenResponse) p.response; - if (rc == 0) { - cb.processResult(rc, clientPath, p.ctx, rsp - .getChildren()); - } else { - cb.processResult(rc, clientPath, p.ctx, null); - } - } else if (p.response instanceof GetChildren2Response) { - Children2Callback cb = (Children2Callback) p.cb; - GetChildren2Response rsp = (GetChildren2Response) p.response; - if (rc == 0) { - cb.processResult(rc, clientPath, p.ctx, rsp - .getChildren(), rsp.getStat()); - } else { - cb.processResult(rc, clientPath, p.ctx, null, null); - } - } else if (p.response instanceof CreateResponse) { - StringCallback cb = (StringCallback) p.cb; - CreateResponse rsp = (CreateResponse) p.response; - if (rc == 0) { - cb.processResult(rc, clientPath, p.ctx, - (chrootPath == null - ? rsp.getPath() - : rsp.getPath() - .substring(chrootPath.length()))); - } else { - cb.processResult(rc, clientPath, p.ctx, null); - } - } else if (p.cb instanceof VoidCallback) { - VoidCallback cb = (VoidCallback) p.cb; - cb.processResult(rc, clientPath, p.ctx); - } - } - } catch (Throwable t) { - LOG.error("Caught unexpected throwable", t); - } - } - } catch (InterruptedException e) { - LOG.error("Event thread exiting due to interruption", e); - } + try { + isRunning = true; + while (true) { + Object event = waitingEvents.take(); + if (event == eventOfDeath) { + wasKilled = true; + } else { + processEvent(event); + } + if (wasKilled) + synchronized (waitingEvents) { + if (waitingEvents.isEmpty()) { + isRunning = false; + break; + } + } + } + } catch (InterruptedException e) { + LOG.error("Event thread exiting due to interruption", e); + } LOG.info("EventThread shut down"); } + + private void processEvent(Object event) { + try { + if (event instanceof WatcherSetEventPair) { + // each watcher will process the event + WatcherSetEventPair pair = (WatcherSetEventPair) event; + for (Watcher watcher : pair.watchers) { + try { + watcher.process(pair.event); + } catch (Throwable t) { + LOG.error("Error while calling watcher ", t); + } + } + } else { + Packet p = (Packet) event; + int rc = 0; + String clientPath = p.clientPath; + if (p.replyHeader.getErr() != 0) { + rc = p.replyHeader.getErr(); + } + if (p.cb == null) { + LOG.warn("Somehow a null cb got to EventThread!"); + } else if (p.response instanceof ExistsResponse + || p.response instanceof SetDataResponse + || p.response instanceof SetACLResponse) { + StatCallback cb = (StatCallback) p.cb; + if (rc == 0) { + if (p.response instanceof ExistsResponse) { + cb.processResult(rc, clientPath, p.ctx, + ((ExistsResponse) p.response) + .getStat()); + } else if (p.response instanceof SetDataResponse) { + cb.processResult(rc, clientPath, p.ctx, + ((SetDataResponse) p.response) + .getStat()); + } else if (p.response instanceof SetACLResponse) { + cb.processResult(rc, clientPath, p.ctx, + ((SetACLResponse) p.response) + .getStat()); + } + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.response instanceof GetDataResponse) { + DataCallback cb = (DataCallback) p.cb; + GetDataResponse rsp = (GetDataResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getData(), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, + null); + } + } else if (p.response instanceof GetACLResponse) { + ACLCallback cb = (ACLCallback) p.cb; + GetACLResponse rsp = (GetACLResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getAcl(), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, + null); + } + } else if (p.response instanceof GetChildrenResponse) { + ChildrenCallback cb = (ChildrenCallback) p.cb; + GetChildrenResponse rsp = (GetChildrenResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getChildren()); + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.response instanceof GetChildren2Response) { + Children2Callback cb = (Children2Callback) p.cb; + GetChildren2Response rsp = (GetChildren2Response) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, rsp + .getChildren(), rsp.getStat()); + } else { + cb.processResult(rc, clientPath, p.ctx, null, null); + } + } else if (p.response instanceof CreateResponse) { + StringCallback cb = (StringCallback) p.cb; + CreateResponse rsp = (CreateResponse) p.response; + if (rc == 0) { + cb.processResult(rc, clientPath, p.ctx, + (chrootPath == null + ? rsp.getPath() + : rsp.getPath() + .substring(chrootPath.length()))); + } else { + cb.processResult(rc, clientPath, p.ctx, null); + } + } else if (p.cb instanceof VoidCallback) { + VoidCallback cb = (VoidCallback) p.cb; + cb.processResult(rc, clientPath, p.ctx); + } + } + } catch (Throwable t) { + LOG.error("Caught unexpected throwable", t); + } + } } private void finishPacket(Packet p) { @@ -1231,9 +1253,9 @@ public class ClientCnxn { } public void close() { - zooKeeper.state = States.CLOSED; synchronized (this) { - selector.wakeup(); + zooKeeper.state = States.CLOSED; + selector.wakeup(); } } } Modified: hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=1025802&r1=1025801&r2=1025802&view=diff ============================================================================== --- hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java (original) +++ hadoop/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/SessionTest.java Thu Oct 21 00:45:45 2010 @@ -32,18 +32,20 @@ import java.util.concurrent.TimeUnit; import junit.framework.TestCase; import org.apache.log4j.Logger; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.NIOServerCnxn; import org.apache.zookeeper.server.ZooKeeperServer; +import org.junit.Assert; import org.junit.Test; public class SessionTest extends TestCase implements Watcher { @@ -195,6 +197,25 @@ public class SessionTest extends TestCas LOG.info("before close zk with session id 0x" + Long.toHexString(zk.getSessionId()) + "!"); zk.close(); + try { + zk.getData("/e", false, stat); + Assert.fail("Should have received a SessionExpiredException"); + } catch(KeeperException.SessionExpiredException e) {} + + AsyncCallback.DataCallback cb = new AsyncCallback.DataCallback() { + String status = "not done"; + public void processResult(int rc, String p, Object c, byte[] b, Stat s) { + synchronized(this) { status = KeeperException.Code.get(rc).toString(); this.notify(); } + } + public String toString() { return status; } + }; + zk.getData("/e", false, cb, null); + synchronized(cb) { + if (cb.toString().equals("not done")) { + cb.wait(1000); + } + } + Assert.assertEquals(KeeperException.Code.SESSIONEXPIRED.toString(), cb.toString()); } private List<Thread> findThreads(String name) {