Repository: zookeeper Updated Branches: refs/heads/master 0b65e3d4c -> 95557a30e
ZOOKEEPER-3131: Remove watcher when session closed in NettyServerCnxn Currently, it doesn't remove itself from ZK server when the cnxn is closed, which will leak watchers, close it to make it align with NIO implementation. Author: Fangmin Lyu <allen...@fb.com> Reviewers: hanm, anmolnar, nkalmar Closes #612 from lvfangmin/ZOOKEEPER-3131 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/95557a30 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/95557a30 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/95557a30 Branch: refs/heads/master Commit: 95557a30edbdfdf4479a1cb142e0d82a4ba6061d Parents: 0b65e3d Author: Fangmin Lyu <allen...@fb.com> Authored: Thu Sep 6 17:34:55 2018 -0700 Committer: Michael Han <h...@apache.org> Committed: Thu Sep 6 17:34:55 2018 -0700 ---------------------------------------------------------------------- .../apache/zookeeper/server/NettyServerCnxn.java | 18 +++++++++++------- .../zookeeper/server/NettyServerCnxnTest.java | 9 +++++++-- 2 files changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/95557a30/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java index ec808a6..948fb3a 100644 --- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java @@ -73,7 +73,7 @@ public class NettyServerCnxn extends ServerCnxn { NettyServerCnxnFactory factory; boolean initialized; - + NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { this.channel = channel; this.closingChannel = false; @@ -83,11 +83,11 @@ public class NettyServerCnxn extends ServerCnxn { this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login); } } - + @Override public void close() { closingChannel = true; - + if (LOG.isDebugEnabled()) { LOG.debug("close called for sessionid:0x" + Long.toHexString(sessionId)); @@ -119,6 +119,10 @@ public class NettyServerCnxn extends ServerCnxn { } } + if (zkServer != null) { + zkServer.removeCnxn(this); + } + if (channel.isOpen()) { // Since we don't check on the futures created by write calls to the channel complete we need to make sure // that all writes have been completed before closing the channel or we risk data loss @@ -174,7 +178,7 @@ public class NettyServerCnxn extends ServerCnxn { @Override public ChannelFuture getFuture() {return null;} }; - + @Override public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { @@ -226,7 +230,7 @@ public class NettyServerCnxn extends ServerCnxn { */ private class SendBufferWriter extends Writer { private StringBuffer sb = new StringBuffer(); - + /** * Check if we are ready to send another chunk. * @param force force sending, even if not a full chunk @@ -415,7 +419,7 @@ public class NettyServerCnxn extends ServerCnxn { public void disableRecv() { disableRecvNoWait().awaitUninterruptibly(); } - + private ChannelFuture disableRecvNoWait() { throttled = true; if (LOG.isDebugEnabled()) { @@ -423,7 +427,7 @@ public class NettyServerCnxn extends ServerCnxn { } return channel.setReadable(false); } - + @Override public long getOutstandingRequests() { return outstandingCount.longValue(); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/95557a30/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java ---------------------------------------------------------------------- diff --git a/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java b/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java index 2038d8b..15f993c 100644 --- a/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java +++ b/src/java/test/org/apache/zookeeper/server/NettyServerCnxnTest.java @@ -56,7 +56,7 @@ public class NettyServerCnxnTest extends ClientBase { * servercnxnfactory should remove all channel references to avoid * duplicate channel closure. Duplicate closure may result in indefinite * hanging due to netty open issue. - * + * * @see <a href="https://issues.jboss.org/browse/NETTY-412">NETTY-412</a> */ @Test(timeout = 40000) @@ -66,13 +66,16 @@ public class NettyServerCnxnTest extends ClientBase { serverFactory instanceof NettyServerCnxnFactory); final ZooKeeper zk = createClient(); + final ZooKeeperServer zkServer = getServer(serverFactory); final String path = "/a"; try { // make sure zkclient works zk.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + // set on watch Assert.assertNotNull("Didn't create znode:" + path, - zk.exists(path, false)); + zk.exists(path, true)); + Assert.assertEquals(1, zkServer.getZKDatabase().getDataTree().getWatchCount()); Iterable<ServerCnxn> connections = serverFactory.getConnections(); Assert.assertEquals("Mismatch in number of live connections!", 1, serverFactory.getNumAliveConnections()); @@ -88,6 +91,8 @@ public class NettyServerCnxnTest extends ClientBase { Assert.fail("The number of live connections should be 0"); } } + // make sure the watch is removed when the connection closed + Assert.assertEquals(0, zkServer.getZKDatabase().getDataTree().getWatchCount()); } finally { zk.close(); }