[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/zookeeper/pull/669


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-21 Thread ivmaykov
GitHub user ivmaykov reopened a pull request:

https://github.com/apache/zookeeper/pull/669

ZOOKEEPER-3152: Port ZK netty stack to netty4

Summary: Ported the client connection netty stack from netty3 to netty4. 
This includes both the server side (NettyServerCnxn and friends) and the client 
side (ClientCnxnSocketNetty).

Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus 
manual testing on a regional ensemble.

FB Reviewers: nixon

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ivmaykov/zookeeper ZOOKEEPER-3152

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/zookeeper/pull/669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #669


commit 4fb8eb6ebe69a4f9a0852624d652d0893d6ba625
Author: Ilya Maykov 
Date:   2018-08-31T23:26:55Z

port ZK netty stack from netty3 to netty4

Summary:
Ported the client connection netty stack from netty3 to netty4. This 
includes both the server side
(NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).

Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus 
manual testing on a regional ensemble.

Reviewers: nixon, nwolchko, nedelchev

Subscribers:

Differential Revision: https://phabricator.intern.facebook.com/D9646262

Tasks:

Tags:

Blame Revision:




---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-21 Thread ivmaykov
Github user ivmaykov closed the pull request at:

https://github.com/apache/zookeeper/pull/669


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233664524
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
+LOG.trace("Channel inactive {}", ctx.channel());
 }
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
+LOG.trace("Channel inactive caused close {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+LOG.warn("Exception caught", cause);
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isDebugEnabled()) {
-LOG.debug("Closing " + cnxn);
+LOG.debug("Closing {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
-throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("message received called " + e.getMessage());
-}
+public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
 try {
-if (LOG.isDebugEnabled()) {
-LOG.debug("New message " + e.toString()
-+ " from " + ctx.getChannel());
-}
-NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
-synchronized(cnxn) {
-processMessage(e, cnxn);
+if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+LOG.debug("Received AutoReadEvent.ENABLE");
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+// TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
--- End diff --

yep!


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233663529
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
+LOG.trace("Channel inactive {}", ctx.channel());
 }
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
+LOG.trace("Channel inactive caused close {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+LOG.warn("Exception caught", cause);
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isDebugEnabled()) {
-LOG.debug("Closing " + cnxn);
+LOG.debug("Closing {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
-throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("message received called " + e.getMessage());
-}
+public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
 try {
-if (LOG.isDebugEnabled()) {
-LOG.debug("New message " + e.toString()
-+ " from " + ctx.getChannel());
-}
-NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
-synchronized(cnxn) {
-processMessage(e, cnxn);
+if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+LOG.debug("Received AutoReadEvent.ENABLE");
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+// TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
--- End diff --

Ah, right. I think I misunderstood. So the code is good as-is, yes?


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233663197
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
+LOG.trace("Channel inactive {}", ctx.channel());
 }
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
+LOG.trace("Channel inactive caused close {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+LOG.warn("Exception caught", cause);
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isDebugEnabled()) {
-LOG.debug("Closing " + cnxn);
+LOG.debug("Closing {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
-throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("message received called " + e.getMessage());
-}
+public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
 try {
-if (LOG.isDebugEnabled()) {
-LOG.debug("New message " + e.toString()
-+ " from " + ctx.getChannel());
-}
-NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
-synchronized(cnxn) {
-processMessage(e, cnxn);
+if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+LOG.debug("Received AutoReadEvent.ENABLE");
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+// TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
--- End diff --

That's why I'm saying keep the removing of the connection attribute in both 
methods. :)



---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233661987
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
+LOG.trace("Channel inactive {}", ctx.channel());
 }
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
+LOG.trace("Channel inactive caused close {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+LOG.warn("Exception caught", cause);
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isDebugEnabled()) {
-LOG.debug("Closing " + cnxn);
+LOG.debug("Closing {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
-throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("message received called " + e.getMessage());
-}
+public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
 try {
-if (LOG.isDebugEnabled()) {
-LOG.debug("New message " + e.toString()
-+ " from " + ctx.getChannel());
-}
-NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
-synchronized(cnxn) {
-processMessage(e, cnxn);
+if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+LOG.debug("Received AutoReadEvent.ENABLE");
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+// TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
--- End diff --

It will not be called twice, since removing the connection attribute means 
the second time we will get null and there is a null check in both places.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233659698
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
+LOG.trace("Channel inactive {}", ctx.channel());
 }
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
+LOG.trace("Channel inactive caused close {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+LOG.warn("Exception caught", cause);
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isDebugEnabled()) {
-LOG.debug("Closing " + cnxn);
+LOG.debug("Closing {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
-throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("message received called " + e.getMessage());
-}
+public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
 try {
-if (LOG.isDebugEnabled()) {
-LOG.debug("New message " + e.toString()
-+ " from " + ctx.getChannel());
-}
-NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
-synchronized(cnxn) {
-processMessage(e, cnxn);
+if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+LOG.debug("Received AutoReadEvent.ENABLE");
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+// TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
--- End diff --

If `cnxn.close();` triggers channelInactive event too, you need to remove 
it, otherwise `close()` will be called twice.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233659921
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -316,16 +251,17 @@ public void operationComplete(ChannelFuture future)
 if (KeeperException.Code.OK !=
 authProvider.handleAuthentication(cnxn, null)) 
{
 LOG.error("Authentication failed for session 0x{}",
-Long.toHexString(cnxn.sessionId));
+Long.toHexString(cnxn.getSessionId()));
 cnxn.close();
 return;
 }
 
-allChannels.add(future.getChannel());
+final Channel futureChannel = future.getNow();
--- End diff --

It's fine.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233651445
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -316,16 +251,17 @@ public void operationComplete(ChannelFuture future)
 if (KeeperException.Code.OK !=
 authProvider.handleAuthentication(cnxn, null)) 
{
 LOG.error("Authentication failed for session 0x{}",
-Long.toHexString(cnxn.sessionId));
+Long.toHexString(cnxn.getSessionId()));
 cnxn.close();
 return;
 }
 
-allChannels.add(future.getChannel());
+final Channel futureChannel = future.getNow();
--- End diff --

I think they are equivalent here since we know the future is completed (we 
are inside a `if (future.isSuccess())` block). I can change it to `get()` if 
you prefer.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233651730
  
--- Diff: 
zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+
+/**
+ * This is a custom ByteBufAllocator that tracks outstanding allocations 
and
+ * crashes the program if any of them are leaked.
+ *
+ * Never use this class in production, it will cause your server to run out
+ * of memory! This is because it holds strong references to all allocated
+ * buffers and doesn't release them until checkForLeaks() is called at the
+ * end of a unit test.
+ *
+ * Note: the original code was copied from 
https://github.com/airlift/drift,
+ * with the permission and encouragement of airlift's author (dain). 
Airlift
+ * uses the same apache 2.0 license as Zookeeper so this should be ok.
+ *
+ * However, the code was modified to take advantage of Netty's built-in
+ * leak tracking and make a best effort to print details about buffer 
leaks.
+ *
+ */
+public class TestByteBufAllocator extends PooledByteBufAllocator {
+private static AtomicReference INSTANCE =
+new AtomicReference<>(null);
+
+/**
+ * Get the singleton testing allocator.
+ * @return the singleton allocator, creating it if one does not exist.
+ */
+public static TestByteBufAllocator getInstance() {
+TestByteBufAllocator result = INSTANCE.get();
+if (result == null) {
+ResourceLeakDetector.Level oldLevel = 
ResourceLeakDetector.getLevel();
+
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+INSTANCE.compareAndSet(null, new 
TestByteBufAllocator(oldLevel));
+result = INSTANCE.get();
+}
+return result;
+}
+
+/**
+ * Destroys the singleton testing allocator and throws an error if any 
of the
+ * buffers allocated by it have been leaked. Attempts to print leak 
details to
+ * standard error before throwing, by using netty's built-in leak 
tracking.
+ * Note that this might not always work, since it only triggers when a 
buffer
+ * is garbage-collected and calling System.gc() does not guarantee 
that a buffer
+ * will actually be GC'ed.
+ *
+ * This should be called at the end of a unit test's tearDown() method.
+ */
+public static void checkForLeaks() {
+TestByteBufAllocator result = INSTANCE.getAndSet(null);
+if (result != null) {
+result.checkInstanceForLeaks();
+}
+}
+
+private final List trackedBuffers = new ArrayList<>();
+private final ResourceLeakDetector.Level oldLevel;
+
+private TestByteBufAllocator(ResourceLeakDetector.Level oldLevel)
+{
+super(false);
+this.oldLevel = oldLevel;
+}
+
+@Override
+protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
+{
+return track(super.newHeapBuffer(initialCapacity, maxCapacity));
+}
+
+@Override
+protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity)
+{
+return track(super.newDirectBuffer(initialCapacity, maxCapacity));
+}
+
+@Override
+public CompositeByteBuf compositeHeapBuffer(int maxNumComponents)
+{
+return track(super.compositeHeapBuffer(maxNumComponents));
+}
+
+

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233650367
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
+LOG.trace("Channel inactive {}", ctx.channel());
 }
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
+LOG.trace("Channel inactive caused close {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+LOG.warn("Exception caught", cause);
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isDebugEnabled()) {
-LOG.debug("Closing " + cnxn);
+LOG.debug("Closing {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
-throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("message received called " + e.getMessage());
-}
+public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
 try {
-if (LOG.isDebugEnabled()) {
-LOG.debug("New message " + e.toString()
-+ " from " + ctx.getChannel());
-}
-NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
-synchronized(cnxn) {
-processMessage(e, cnxn);
+if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+LOG.debug("Received AutoReadEvent.ENABLE");
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+// TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
--- End diff --

I do remove it in both places, by calling 
`ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null)`. The getAndSet(null) 
will atomically return the old value of the attribute and set the new value to 
null.

Now that I think about it, I'm not sure if we need to remove the attribute 
in `exceptionCaught()` ... we can probably leave it and let `channelInactive()` 
take care of removing the attribute. Let me know if you want me to make this 
change, I think it probably doesn't matter too much either way.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233652712
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -103,71 +105,102 @@
 boolean isConnected() {
 // Assuming that isConnected() is only used to initiate connection,
 // not used by some other connection status judgement.
-return channel != null;
+connectLock.lock();
+try {
+return channel != null || connectFuture != null;
--- End diff --

As the comment above says, the `isConnected()` method is only used in the 
main loop inside `ClientCnxn$SendThread.run()` to see if a new connection 
should be initiated. So, this method should return false if a connection 
attempt is already in progress. This is the case when `connectFuture` is not 
null. Arguably the method should be called `isConnectedOrConnecting()` but I 
didn't want to go around refactoring APIs in this diff - can do it if you like.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233653584
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -103,71 +105,102 @@
 boolean isConnected() {
 // Assuming that isConnected() is only used to initiate connection,
 // not used by some other connection status judgement.
-return channel != null;
+connectLock.lock();
+try {
+return channel != null || connectFuture != null;
+} finally {
+connectLock.unlock();
+}
+}
+
+private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
+ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+if (testAllocator != null) {
+return bootstrap.option(ChannelOption.ALLOCATOR, 
testAllocator);
+} else {
+return bootstrap;
+}
 }
 
 @Override
 void connect(InetSocketAddress addr) throws IOException {
 firstConnect = new CountDownLatch(1);
 
-ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-
-bootstrap.setPipelineFactory(new 
ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
-bootstrap.setOption("soLinger", -1);
-bootstrap.setOption("tcpNoDelay", true);
-
-connectFuture = bootstrap.connect(addr);
-connectFuture.addListener(new ChannelFutureListener() {
-@Override
-public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
-// this lock guarantees that channel won't be assgined 
after cleanup().
-connectLock.lock();
-try {
-if (!channelFuture.isSuccess() || connectFuture == 
null) {
-LOG.info("future isn't success, cause: {}", 
channelFuture.getCause());
-return;
-}
-// setup channel, variables, connection, etc.
-channel = channelFuture.getChannel();
-
-disconnected.set(false);
-initialized = false;
-lenBuffer.clear();
-incomingBuffer = lenBuffer;
-
-sendThread.primeConnection();
-updateNow();
-updateLastSendAndHeard();
-
-if (sendThread.tunnelAuthInProgress()) {
-waitSasl.drainPermits();
-needSasl.set(true);
-sendPrimePacket();
-} else {
-needSasl.set(false);
-}
+Bootstrap bootstrap = new Bootstrap()
+.group(eventLoopGroup)
+.channel(NettyUtils.nioOrEpollSocketChannel())
+.option(ChannelOption.SO_LINGER, -1)
+.option(ChannelOption.TCP_NODELAY, true)
+.handler(new ZKClientPipelineFactory(addr.getHostString(), 
addr.getPort()));
+bootstrap = configureBootstrapAllocator(bootstrap);
+bootstrap.validate();
 
-// we need to wake up on first connect to avoid 
timeout.
-wakeupCnxn();
-firstConnect.countDown();
-LOG.info("channel is connected: {}", 
channelFuture.getChannel());
-} finally {
-connectLock.unlock();
+connectLock.lock();
+try {
+connectFuture = bootstrap.connect(addr);
+connectFuture.addListener(new ChannelFutureListener() {
+@Override
+public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
+// this lock guarantees that channel won't be assigned 
after cleanup().
+connectLock.lock();
+try {
+if (!channelFuture.isSuccess()) {
+LOG.info("future isn't success, cause:", 
channelFuture.cause());
+return;
+} else if (connectFuture == null) {
--- End diff --

As the comment below says, there could be a race if the connect attempt was 
cancelled right around the time the listener callback fired. `cleanup()` below 
will cancel an in-progress connection attempt and clear the `connectFuture` 
variable. If this happens, `connectFuture` will be null here.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-14 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233649908
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
+LOG.trace("Channel inactive {}", ctx.channel());
 }
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
+LOG.trace("Channel inactive caused close {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
--- End diff --

We call `cnxn.close()` at the end of `exceptionCaught()`, which will end up 
closing the channel so `channelInactive()` will get called, so I think it would 
be redundant to remove from `allChannels` here.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-13 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233289109
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -103,71 +105,102 @@
 boolean isConnected() {
 // Assuming that isConnected() is only used to initiate connection,
 // not used by some other connection status judgement.
-return channel != null;
+connectLock.lock();
+try {
+return channel != null || connectFuture != null;
+} finally {
+connectLock.unlock();
+}
+}
+
+private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
+ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+if (testAllocator != null) {
+return bootstrap.option(ChannelOption.ALLOCATOR, 
testAllocator);
+} else {
+return bootstrap;
+}
 }
 
 @Override
 void connect(InetSocketAddress addr) throws IOException {
 firstConnect = new CountDownLatch(1);
 
-ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-
-bootstrap.setPipelineFactory(new 
ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
-bootstrap.setOption("soLinger", -1);
-bootstrap.setOption("tcpNoDelay", true);
-
-connectFuture = bootstrap.connect(addr);
-connectFuture.addListener(new ChannelFutureListener() {
-@Override
-public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
-// this lock guarantees that channel won't be assgined 
after cleanup().
-connectLock.lock();
-try {
-if (!channelFuture.isSuccess() || connectFuture == 
null) {
-LOG.info("future isn't success, cause: {}", 
channelFuture.getCause());
-return;
-}
-// setup channel, variables, connection, etc.
-channel = channelFuture.getChannel();
-
-disconnected.set(false);
-initialized = false;
-lenBuffer.clear();
-incomingBuffer = lenBuffer;
-
-sendThread.primeConnection();
-updateNow();
-updateLastSendAndHeard();
-
-if (sendThread.tunnelAuthInProgress()) {
-waitSasl.drainPermits();
-needSasl.set(true);
-sendPrimePacket();
-} else {
-needSasl.set(false);
-}
+Bootstrap bootstrap = new Bootstrap()
+.group(eventLoopGroup)
+.channel(NettyUtils.nioOrEpollSocketChannel())
+.option(ChannelOption.SO_LINGER, -1)
+.option(ChannelOption.TCP_NODELAY, true)
+.handler(new ZKClientPipelineFactory(addr.getHostString(), 
addr.getPort()));
+bootstrap = configureBootstrapAllocator(bootstrap);
+bootstrap.validate();
 
-// we need to wake up on first connect to avoid 
timeout.
-wakeupCnxn();
-firstConnect.countDown();
-LOG.info("channel is connected: {}", 
channelFuture.getChannel());
-} finally {
-connectLock.unlock();
+connectLock.lock();
+try {
+connectFuture = bootstrap.connect(addr);
+connectFuture.addListener(new ChannelFutureListener() {
+@Override
+public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
+// this lock guarantees that channel won't be assigned 
after cleanup().
+connectLock.lock();
+try {
+if (!channelFuture.isSuccess()) {
+LOG.info("future isn't success, cause:", 
channelFuture.cause());
+return;
+} else if (connectFuture == null) {
--- End diff --

How could `connectFuture` be null?
`connectFuture.addListener` call would have already thrown NPE in that case.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-13 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233288683
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -103,71 +105,102 @@
 boolean isConnected() {
 // Assuming that isConnected() is only used to initiate connection,
 // not used by some other connection status judgement.
-return channel != null;
+connectLock.lock();
+try {
+return channel != null || connectFuture != null;
--- End diff --

Why would you like to check `connectFuture` too?


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-13 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233279457
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
+LOG.trace("Channel inactive {}", ctx.channel());
 }
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
+LOG.trace("Channel inactive caused close {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+LOG.warn("Exception caught", cause);
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isDebugEnabled()) {
-LOG.debug("Closing " + cnxn);
+LOG.debug("Closing {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
-throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("message received called " + e.getMessage());
-}
+public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
 try {
-if (LOG.isDebugEnabled()) {
-LOG.debug("New message " + e.toString()
-+ " from " + ctx.getChannel());
-}
-NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
-synchronized(cnxn) {
-processMessage(e, cnxn);
+if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+LOG.debug("Received AutoReadEvent.ENABLE");
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+// TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
--- End diff --

Do you need to remove `cnxn` from the channel in the mentioned two events?
Null check wouldn't do any harm though.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-13 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233282137
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -316,16 +251,17 @@ public void operationComplete(ChannelFuture future)
 if (KeeperException.Code.OK !=
 authProvider.handleAuthentication(cnxn, null)) 
{
 LOG.error("Authentication failed for session 0x{}",
-Long.toHexString(cnxn.sessionId));
+Long.toHexString(cnxn.getSessionId()));
 cnxn.close();
 return;
 }
 
-allChannels.add(future.getChannel());
+final Channel futureChannel = future.getNow();
--- End diff --

I think `get()` would be enough, but the check is harmful anyway.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-13 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233278670
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +116,104 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
+LOG.trace("Channel inactive {}", ctx.channel());
 }
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
 if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
+LOG.trace("Channel inactive caused close {}", cnxn);
 }
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
--- End diff --

You remove `ctx.channel()` from `allChannels` in the Inactive method. Which 
was actually not the case in the original impl, but I think it makes perfect 
sense.

Don't you wanna do the same in here?


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-13 Thread anmolnar
Github user anmolnar commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r233286512
  
--- Diff: 
zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+
+/**
+ * This is a custom ByteBufAllocator that tracks outstanding allocations 
and
+ * crashes the program if any of them are leaked.
+ *
+ * Never use this class in production, it will cause your server to run out
+ * of memory! This is because it holds strong references to all allocated
+ * buffers and doesn't release them until checkForLeaks() is called at the
+ * end of a unit test.
+ *
+ * Note: the original code was copied from 
https://github.com/airlift/drift,
+ * with the permission and encouragement of airlift's author (dain). 
Airlift
+ * uses the same apache 2.0 license as Zookeeper so this should be ok.
+ *
+ * However, the code was modified to take advantage of Netty's built-in
+ * leak tracking and make a best effort to print details about buffer 
leaks.
+ *
+ */
+public class TestByteBufAllocator extends PooledByteBufAllocator {
+private static AtomicReference INSTANCE =
+new AtomicReference<>(null);
+
+/**
+ * Get the singleton testing allocator.
+ * @return the singleton allocator, creating it if one does not exist.
+ */
+public static TestByteBufAllocator getInstance() {
+TestByteBufAllocator result = INSTANCE.get();
+if (result == null) {
+ResourceLeakDetector.Level oldLevel = 
ResourceLeakDetector.getLevel();
+
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+INSTANCE.compareAndSet(null, new 
TestByteBufAllocator(oldLevel));
+result = INSTANCE.get();
+}
+return result;
+}
+
+/**
+ * Destroys the singleton testing allocator and throws an error if any 
of the
+ * buffers allocated by it have been leaked. Attempts to print leak 
details to
+ * standard error before throwing, by using netty's built-in leak 
tracking.
+ * Note that this might not always work, since it only triggers when a 
buffer
+ * is garbage-collected and calling System.gc() does not guarantee 
that a buffer
+ * will actually be GC'ed.
+ *
+ * This should be called at the end of a unit test's tearDown() method.
+ */
+public static void checkForLeaks() {
+TestByteBufAllocator result = INSTANCE.getAndSet(null);
+if (result != null) {
+result.checkInstanceForLeaks();
+}
+}
+
+private final List trackedBuffers = new ArrayList<>();
+private final ResourceLeakDetector.Level oldLevel;
+
+private TestByteBufAllocator(ResourceLeakDetector.Level oldLevel)
+{
+super(false);
+this.oldLevel = oldLevel;
+}
+
+@Override
+protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
+{
+return track(super.newHeapBuffer(initialCapacity, maxCapacity));
+}
+
+@Override
+protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity)
+{
+return track(super.newDirectBuffer(initialCapacity, maxCapacity));
+}
+
+@Override
+public CompositeByteBuf compositeHeapBuffer(int maxNumComponents)
+{
+return track(super.compositeHeapBuffer(maxNumComponents));
+}
+
+

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-07 Thread ivmaykov
GitHub user ivmaykov reopened a pull request:

https://github.com/apache/zookeeper/pull/669

ZOOKEEPER-3152: Port ZK netty stack to netty4

Summary: Ported the client connection netty stack from netty3 to netty4. 
This includes both the server side (NettyServerCnxn and friends) and the client 
side (ClientCnxnSocketNetty).

Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus 
manual testing on a regional ensemble.

FB Reviewers: nixon

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ivmaykov/zookeeper ZOOKEEPER-3152

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/zookeeper/pull/669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #669


commit 94c4516bea9b46f5428ef29ff51490f9647eaac3
Author: Ilya Maykov 
Date:   2018-08-31T23:26:55Z

port ZK netty stack from netty3 to netty4

Summary:
Ported the client connection netty stack from netty3 to netty4. This 
includes both the server side
(NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).

Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus 
manual testing on a regional ensemble.

Reviewers: nixon, nwolchko, nedelchev

Subscribers:

Differential Revision: https://phabricator.intern.facebook.com/D9646262

Tasks:

Tags:

Blame Revision:




---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-11-07 Thread ivmaykov
Github user ivmaykov closed the pull request at:

https://github.com/apache/zookeeper/pull/669


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-21 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226849723
  
--- Diff: ivy.xml ---
@@ -59,9 +59,11 @@
 
 
-
-  
-
+
--- End diff --

I forgot about this.
I think it is better to use netty-all because it bundles all of the native 
artifacts.
In Bookkeeper for instance we have netty -all, but this is common practice 
in many other projects.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-20 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226845944
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
-}
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
+LOG.trace("Channel inactive {}", ctx.channel());
--- End diff --

Do you want a similar check for LOG.debug() calls as well, or only 
LOG.trace()?


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-20 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226839154
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -267,7 +298,7 @@ private void sendPkt(Packet p) {
 p.createBB();
 updateLastSend();
 sentCount++;
-channel.write(ChannelBuffers.wrappedBuffer(p.bb));
+channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb));
--- End diff --

Essentially you will save allocations if you don't need listeners


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-20 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226839114
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
-}
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
+LOG.trace("Channel inactive {}", ctx.channel());
--- End diff --

It is better to add the 'if' because in general you will skip calling the 
logger method, with all what is comes with it: evaluating expressions for 
parameters, passing parameters for the method call, and calling the method.
You will trade a single cheap method call with a potential expense of 
resources and useless allocations.
IMHO it is better to have this pattern consistently in the whole code


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-20 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226838972
  
--- Diff: 
zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is a custom ByteBufAllocator that tracks outstanding allocations 
and
+ * crashes the program if any of them are leaked.
+ *
+ * Never use this class in production, it will cause your server to run out
+ * of memory! This is because it holds strong references to all allocated
+ * buffers and doesn't release them until checkForLeaks() is called at the
+ * end of a unit test.
+ *
+ * Note: the original code was copied from 
https://github.com/airlift/drift,
+ * with the permission and encouragement of airlift's author (dain). 
Airlift
+ * uses the same apache 2.0 license as Zookeeper so this should be ok.
+ *
+ * However, the code was modified to take advantage of Netty's built-in
+ * leak tracking and make a best effort to print details about buffer 
leaks.
+ */
+public class TestByteBufAllocator extends PooledByteBufAllocator {
--- End diff --

Okay, very good


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-20 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226836043
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -439,13 +466,34 @@ public void messageReceived(ChannelHandlerContext ctx,
 }
 }
 wakeupCnxn();
+// Note: SimpleChannelInboundHandler releases the ByteBuf for 
us
+// so we don't need to do it.
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx,
-ExceptionEvent e) throws Exception {
-LOG.warn("Exception caught: {}", e, e.getCause());
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+LOG.warn("Exception caught", cause);
 cleanup();
 }
 }
+
+/**
+ * Sets the test ByteBufAllocator. This allocator will be used by all
+ * future instances of this class.
+ * It is not recommended to use this method outside of testing.
+ * @param allocator the ByteBufAllocator to use for all netty buffer
+ *  allocations.
+ */
+public static void setTestAllocator(ByteBufAllocator allocator) {
+TEST_ALLOCATOR.set(allocator);
--- End diff --

Yes in my case this is a hole because the client code runs in a potentially 
unsecure JVM when user code can call public methods


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-20 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226835980
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -267,7 +298,7 @@ private void sendPkt(Packet p) {
 p.createBB();
 updateLastSend();
 sentCount++;
-channel.write(ChannelBuffers.wrappedBuffer(p.bb));
+channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb));
--- End diff --

See these very interesting slides 
http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#8.0


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-20 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226835913
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -103,71 +108,95 @@
 boolean isConnected() {
 // Assuming that isConnected() is only used to initiate connection,
 // not used by some other connection status judgement.
-return channel != null;
+connectLock.lock();
+try {
+return connectFuture != null || channel != null;
+} finally {
+connectLock.unlock();
+}
 }
 
 @Override
 void connect(InetSocketAddress addr) throws IOException {
 firstConnect = new CountDownLatch(1);
 
-ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-
-bootstrap.setPipelineFactory(new 
ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
-bootstrap.setOption("soLinger", -1);
-bootstrap.setOption("tcpNoDelay", true);
-
-connectFuture = bootstrap.connect(addr);
-connectFuture.addListener(new ChannelFutureListener() {
-@Override
-public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
-// this lock guarantees that channel won't be assgined 
after cleanup().
-connectLock.lock();
-try {
-if (!channelFuture.isSuccess() || connectFuture == 
null) {
-LOG.info("future isn't success, cause: {}", 
channelFuture.getCause());
-return;
-}
-// setup channel, variables, connection, etc.
-channel = channelFuture.getChannel();
-
-disconnected.set(false);
-initialized = false;
-lenBuffer.clear();
-incomingBuffer = lenBuffer;
-
-sendThread.primeConnection();
-updateNow();
-updateLastSendAndHeard();
-
-if (sendThread.tunnelAuthInProgress()) {
-waitSasl.drainPermits();
-needSasl.set(true);
-sendPrimePacket();
-} else {
-needSasl.set(false);
-}
+Bootstrap bootstrap = new Bootstrap();
+bootstrap.group(Objects.requireNonNull(eventLoopGroup))
+.channel(NioSocketChannel.class)
+.handler(new ZKClientPipelineFactory(addr.getHostString(), 
addr.getPort()))
+.option(ChannelOption.SO_LINGER, -1)
+.option(ChannelOption.TCP_NODELAY, true);
+ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+if (testAllocator != null) {
+bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
+}
+bootstrap.validate();
+
+connectLock.lock();
+try {
+connectFuture = bootstrap.connect(addr);
+connectFuture.addListener(new ChannelFutureListener() {
+@Override
+public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
+// this lock guarantees that channel won't be assigned 
after cleanup().
+connectLock.lock();
+try {
+if (!channelFuture.isSuccess()) {
+LOG.info("future isn't success, cause:", 
channelFuture.cause());
+return;
+} else if (connectFuture == null) {
+LOG.info("connect attempt cancelled");
+// If the connect attempt was cancelled but 
succeeded
+// anyway, make sure to close the channel, 
otherwise
+// we may leak a file descriptor.
+channelFuture.channel().close();
--- End diff --

That was my guess too. Let's keep it single. So okay to me


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-20 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226835891
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -68,18 +70,21 @@
 public class ClientCnxnSocketNetty extends ClientCnxnSocket {
 private static final Logger LOG = 
LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
 
-ChannelFactory channelFactory = new NioClientSocketChannelFactory(
-Executors.newCachedThreadPool(), 
Executors.newCachedThreadPool());
-Channel channel;
-CountDownLatch firstConnect;
-ChannelFuture connectFuture;
-Lock connectLock = new ReentrantLock();
-AtomicBoolean disconnected = new AtomicBoolean();
-AtomicBoolean needSasl = new AtomicBoolean();
-Semaphore waitSasl = new Semaphore(0);
+private final EventLoopGroup eventLoopGroup;
+private Channel channel;
+private CountDownLatch firstConnect;
+private ChannelFuture connectFuture;
+private final Lock connectLock = new ReentrantLock();
+private final AtomicBoolean disconnected = new AtomicBoolean();
+private final AtomicBoolean needSasl = new AtomicBoolean();
+private final Semaphore waitSasl = new Semaphore(0);
+
+private static final AtomicReference TEST_ALLOCATOR =
+new AtomicReference<>(null);
 
 ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
 this.clientConfig = clientConfig;
+eventLoopGroup = new NioEventLoopGroup(0, 
Executors.newCachedThreadPool());
--- End diff --

In almost of the project I know using Netty you are trying to use EPoll ig 
available and then fallback to Nio.
I will be happy to create a JIRA and send a diff once we get this merged


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226757240
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -335,29 +260,34 @@ public void operationComplete(ChannelFuture future)
 CnxnChannelHandler channelHandler = new CnxnChannelHandler();
 
 NettyServerCnxnFactory() {
-bootstrap = new ServerBootstrap(
-new NioServerSocketChannelFactory(
-Executors.newCachedThreadPool(),
-Executors.newCachedThreadPool()));
-// parent channel
-bootstrap.setOption("reuseAddress", true);
-// child channels
-bootstrap.setOption("child.tcpNoDelay", true);
-/* set socket linger to off, so that socket close does not block */
-bootstrap.setOption("child.soLinger", -1);
-bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-@Override
-public ChannelPipeline getPipeline() throws Exception {
-ChannelPipeline p = Channels.pipeline();
-if (secure) {
-initSSL(p);
-}
-p.addLast("servercnxnfactory", channelHandler);
-
-return p;
-}
-});
 x509Util = new ClientX509Util();
+
+EventLoopGroup bossGroup = new NioEventLoopGroup(0, 
Executors.newCachedThreadPool());
--- End diff --

See comment above about making epoll optional based on a config option.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226757190
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
-}
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
+LOG.trace("Channel inactive {}", ctx.channel());
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
-}
+LOG.trace("Channel inactive caused close {}", cnxn);
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+LOG.warn("Exception caught", cause);
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
-if (LOG.isDebugEnabled()) {
-LOG.debug("Closing " + cnxn);
-}
+LOG.debug("Closing {}", cnxn);
 cnxn.close();
 }
 }
 
 @Override
-public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
-throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("message received called " + e.getMessage());
-}
+public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
 try {
-if (LOG.isDebugEnabled()) {
-LOG.debug("New message " + e.toString()
-+ " from " + ctx.getChannel());
-}
-NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
-synchronized(cnxn) {
-processMessage(e, cnxn);
+if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+LOG.debug("Received AutoReadEvent.ENABLE");
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+// TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
+// or exceptionCaught() trigger, but it's unclear to 
me if userEventTriggered() can run
+// after either of those. Check for null just to be 
safe ...
+if (cnxn != null) {
+cnxn.processQueuedBuffer();
+}
+ctx.channel().config().setAutoRead(true);
+} else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
+LOG.debug("Received AutoReadEvent.DISABLE");
+ctx.channel().config().setAutoRead(false);
 }
-} catch(Exception ex) {
-LOG.error("Unexpected exception in 

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226757057
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
-}
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
+LOG.trace("Channel inactive {}", ctx.channel());
--- End diff --

LOG.trace() does an isTraceEnabled check internally. If the additional 
parameters passed to the log method don't do any work (such as converting the 
contents of a buffer to a hex string), then the enclosing isTraceEnabled check 
is redundant.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226756739
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java 
---
@@ -200,24 +186,13 @@ public void setSessionId(long sessionId) {
 this.sessionId = sessionId;
 }
 
-@Override
-public void enableRecv() {
-if (throttled) {
-throttled = false;
-if (LOG.isDebugEnabled()) {
-LOG.debug("Sending unthrottle event " + this);
-}
-channel.getPipeline().sendUpstream(new 
ResumeMessageEvent(channel));
-}
-}
-
 @Override
 public void sendBuffer(ByteBuffer sendBuffer) {
 if (sendBuffer == ServerCnxnFactory.closeConn) {
 close();
 return;
 }
-channel.write(wrappedBuffer(sendBuffer));
+channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer));
--- End diff --

As above, I'm not sure what that provides. I am still learning about netty 
so please excuse my ignorance :)


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226756353
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -439,13 +466,34 @@ public void messageReceived(ChannelHandlerContext ctx,
 }
 }
 wakeupCnxn();
+// Note: SimpleChannelInboundHandler releases the ByteBuf for 
us
+// so we don't need to do it.
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx,
-ExceptionEvent e) throws Exception {
-LOG.warn("Exception caught: {}", e, e.getCause());
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+LOG.warn("Exception caught", cause);
 cleanup();
 }
 }
+
+/**
+ * Sets the test ByteBufAllocator. This allocator will be used by all
+ * future instances of this class.
+ * It is not recommended to use this method outside of testing.
+ * @param allocator the ByteBufAllocator to use for all netty buffer
+ *  allocations.
+ */
+public static void setTestAllocator(ByteBufAllocator allocator) {
+TEST_ALLOCATOR.set(allocator);
--- End diff --

Sure, but that would only affect that client and would have no effect on 
the server. All clients are untrusted by default since they run on a computer 
you don't own :)

I can look into using a mocking framework instead if you feel strongly 
about it.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226756565
  
--- Diff: 
zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is a custom ByteBufAllocator that tracks outstanding allocations 
and
+ * crashes the program if any of them are leaked.
+ *
+ * Never use this class in production, it will cause your server to run out
+ * of memory! This is because it holds strong references to all allocated
+ * buffers and doesn't release them until checkForLeaks() is called at the
+ * end of a unit test.
+ *
+ * Note: the original code was copied from 
https://github.com/airlift/drift,
+ * with the permission and encouragement of airlift's author (dain). 
Airlift
+ * uses the same apache 2.0 license as Zookeeper so this should be ok.
+ *
+ * However, the code was modified to take advantage of Netty's built-in
+ * leak tracking and make a best effort to print details about buffer 
leaks.
+ */
+public class TestByteBufAllocator extends PooledByteBufAllocator {
--- End diff --

Paranoid leak detection will just print details about leaks, but the test 
will still pass and it takes a human to examine the test's stderr output to see 
that there was a problem. Using this allocator will make the test fail if there 
are buffer leaks.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226756052
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -267,7 +298,7 @@ private void sendPkt(Packet p) {
 p.createBB();
 updateLastSend();
 sentCount++;
-channel.write(ChannelBuffers.wrappedBuffer(p.bb));
+channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb));
--- End diff --

Can you explain what the purpose of that would be? According to the 
documentation, voidPromise() returns a promise that will never be notified of 
success or failure.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226755668
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -184,7 +213,9 @@ void cleanup() {
 
 @Override
 void close() {
-channelFactory.releaseExternalResources();
+if (!eventLoopGroup.isShuttingDown()) {
--- End diff --

I'm not sure if calling `shutdownGracefully` more than once is allowed, 
which is why I added the check. It might not be necessary.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226755419
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -103,71 +108,95 @@
 boolean isConnected() {
 // Assuming that isConnected() is only used to initiate connection,
 // not used by some other connection status judgement.
-return channel != null;
+connectLock.lock();
+try {
+return connectFuture != null || channel != null;
+} finally {
+connectLock.unlock();
+}
 }
 
 @Override
 void connect(InetSocketAddress addr) throws IOException {
 firstConnect = new CountDownLatch(1);
 
-ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-
-bootstrap.setPipelineFactory(new 
ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
-bootstrap.setOption("soLinger", -1);
-bootstrap.setOption("tcpNoDelay", true);
-
-connectFuture = bootstrap.connect(addr);
-connectFuture.addListener(new ChannelFutureListener() {
-@Override
-public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
-// this lock guarantees that channel won't be assgined 
after cleanup().
-connectLock.lock();
-try {
-if (!channelFuture.isSuccess() || connectFuture == 
null) {
-LOG.info("future isn't success, cause: {}", 
channelFuture.getCause());
-return;
-}
-// setup channel, variables, connection, etc.
-channel = channelFuture.getChannel();
-
-disconnected.set(false);
-initialized = false;
-lenBuffer.clear();
-incomingBuffer = lenBuffer;
-
-sendThread.primeConnection();
-updateNow();
-updateLastSendAndHeard();
-
-if (sendThread.tunnelAuthInProgress()) {
-waitSasl.drainPermits();
-needSasl.set(true);
-sendPrimePacket();
-} else {
-needSasl.set(false);
-}
+Bootstrap bootstrap = new Bootstrap();
+bootstrap.group(Objects.requireNonNull(eventLoopGroup))
+.channel(NioSocketChannel.class)
+.handler(new ZKClientPipelineFactory(addr.getHostString(), 
addr.getPort()))
+.option(ChannelOption.SO_LINGER, -1)
+.option(ChannelOption.TCP_NODELAY, true);
+ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+if (testAllocator != null) {
+bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
+}
+bootstrap.validate();
+
+connectLock.lock();
+try {
+connectFuture = bootstrap.connect(addr);
+connectFuture.addListener(new ChannelFutureListener() {
+@Override
+public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
+// this lock guarantees that channel won't be assigned 
after cleanup().
+connectLock.lock();
+try {
+if (!channelFuture.isSuccess()) {
+LOG.info("future isn't success, cause:", 
channelFuture.cause());
+return;
+} else if (connectFuture == null) {
+LOG.info("connect attempt cancelled");
+// If the connect attempt was cancelled but 
succeeded
+// anyway, make sure to close the channel, 
otherwise
+// we may leak a file descriptor.
+channelFuture.channel().close();
--- End diff --

I don't think so, since this code can only trigger if the connect future is 
successful. If the future is not successful, the previous if branch will be 
taken.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread ivmaykov
Github user ivmaykov commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226755285
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -68,18 +70,21 @@
 public class ClientCnxnSocketNetty extends ClientCnxnSocket {
 private static final Logger LOG = 
LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
 
-ChannelFactory channelFactory = new NioClientSocketChannelFactory(
-Executors.newCachedThreadPool(), 
Executors.newCachedThreadPool());
-Channel channel;
-CountDownLatch firstConnect;
-ChannelFuture connectFuture;
-Lock connectLock = new ReentrantLock();
-AtomicBoolean disconnected = new AtomicBoolean();
-AtomicBoolean needSasl = new AtomicBoolean();
-Semaphore waitSasl = new Semaphore(0);
+private final EventLoopGroup eventLoopGroup;
+private Channel channel;
+private CountDownLatch firstConnect;
+private ChannelFuture connectFuture;
+private final Lock connectLock = new ReentrantLock();
+private final AtomicBoolean disconnected = new AtomicBoolean();
+private final AtomicBoolean needSasl = new AtomicBoolean();
+private final Semaphore waitSasl = new Semaphore(0);
+
+private static final AtomicReference TEST_ALLOCATOR =
+new AtomicReference<>(null);
 
 ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
 this.clientConfig = clientConfig;
+eventLoopGroup = new NioEventLoopGroup(0, 
Executors.newCachedThreadPool());
--- End diff --

I'd like to do it in a follow-up diff. I was thinking we default to NIO 
(since it works on all OSes), and have a config option to use Epoll instead.


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226687650
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java 
---
@@ -200,24 +186,13 @@ public void setSessionId(long sessionId) {
 this.sessionId = sessionId;
 }
 
-@Override
-public void enableRecv() {
-if (throttled) {
-throttled = false;
-if (LOG.isDebugEnabled()) {
-LOG.debug("Sending unthrottle event " + this);
-}
-channel.getPipeline().sendUpstream(new 
ResumeMessageEvent(channel));
-}
-}
-
 @Override
 public void sendBuffer(ByteBuffer sendBuffer) {
 if (sendBuffer == ServerCnxnFactory.closeConn) {
 close();
 return;
 }
-channel.write(wrappedBuffer(sendBuffer));
+channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer));
--- End diff --

Consider using voidPromise()


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226684843
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -439,13 +466,34 @@ public void messageReceived(ChannelHandlerContext ctx,
 }
 }
 wakeupCnxn();
+// Note: SimpleChannelInboundHandler releases the ByteBuf for 
us
+// so we don't need to do it.
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx,
-ExceptionEvent e) throws Exception {
-LOG.warn("Exception caught: {}", e, e.getCause());
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
+LOG.warn("Exception caught", cause);
 cleanup();
 }
 }
+
+/**
+ * Sets the test ByteBufAllocator. This allocator will be used by all
+ * future instances of this class.
+ * It is not recommended to use this method outside of testing.
+ * @param allocator the ByteBufAllocator to use for all netty buffer
+ *  allocations.
+ */
+public static void setTestAllocator(ByteBufAllocator allocator) {
+TEST_ALLOCATOR.set(allocator);
--- End diff --

I think this is a security hole. We are in the client, so untrusted code 
may call this public method.
We should use mockito/powermock for this stuff.
Is there are another way?


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226685450
  
--- Diff: 
zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is a custom ByteBufAllocator that tracks outstanding allocations 
and
+ * crashes the program if any of them are leaked.
+ *
+ * Never use this class in production, it will cause your server to run out
+ * of memory! This is because it holds strong references to all allocated
+ * buffers and doesn't release them until checkForLeaks() is called at the
+ * end of a unit test.
+ *
+ * Note: the original code was copied from 
https://github.com/airlift/drift,
+ * with the permission and encouragement of airlift's author (dain). 
Airlift
+ * uses the same apache 2.0 license as Zookeeper so this should be ok.
+ *
+ * However, the code was modified to take advantage of Netty's built-in
+ * leak tracking and make a best effort to print details about buffer 
leaks.
+ */
+public class TestByteBufAllocator extends PooledByteBufAllocator {
--- End diff --

This is interesting

Netty has already built in support for this kind of stuff.I see that this 
class is smarter.
Isn't running test with paranoid leak detection enough?


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226690017
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
-}
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
+LOG.trace("Channel inactive {}", ctx.channel());
+allChannels.remove(ctx.channel());
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
-if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnect caused close " + e);
-}
+LOG.trace("Channel inactive caused close {}", cnxn);
 cnxn.close();
 }
 }
 
 @Override
-public void exceptionCaught(ChannelHandlerContext ctx, 
ExceptionEvent e)
-throws Exception
-{
-LOG.warn("Exception caught " + e, e.getCause());
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
+LOG.warn("Exception caught", cause);
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
 if (cnxn != null) {
-if (LOG.isDebugEnabled()) {
-LOG.debug("Closing " + cnxn);
-}
+LOG.debug("Closing {}", cnxn);
 cnxn.close();
 }
 }
 
 @Override
-public void messageReceived(ChannelHandlerContext ctx, 
MessageEvent e)
-throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("message received called " + e.getMessage());
-}
+public void userEventTriggered(ChannelHandlerContext ctx, Object 
evt) throws Exception {
 try {
-if (LOG.isDebugEnabled()) {
-LOG.debug("New message " + e.toString()
-+ " from " + ctx.getChannel());
-}
-NettyServerCnxn cnxn = 
(NettyServerCnxn)ctx.getAttachment();
-synchronized(cnxn) {
-processMessage(e, cnxn);
+if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+LOG.debug("Received AutoReadEvent.ENABLE");
+NettyServerCnxn cnxn = 
ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+// TODO(ilyam): Not sure if cnxn can be null here. It 
becomes null if channelInactive()
+// or exceptionCaught() trigger, but it's unclear to 
me if userEventTriggered() can run
+// after either of those. Check for null just to be 
safe ...
+if (cnxn != null) {
+cnxn.processQueuedBuffer();
+}
+ctx.channel().config().setAutoRead(true);
+} else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
+LOG.debug("Received AutoReadEvent.DISABLE");
+ctx.channel().config().setAutoRead(false);
 }
-} catch(Exception ex) {
-LOG.error("Unexpected exception 

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226684209
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -267,7 +298,7 @@ private void sendPkt(Packet p) {
 p.createBB();
 updateLastSend();
 sentCount++;
-channel.write(ChannelBuffers.wrappedBuffer(p.bb));
+channel.writeAndFlush(Unpooled.wrappedBuffer(p.bb));
--- End diff --

What about adding ', channel.voidPromise()' ?


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226683922
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -184,7 +213,9 @@ void cleanup() {
 
 @Override
 void close() {
-channelFactory.releaseExternalResources();
+if (!eventLoopGroup.isShuttingDown()) {
--- End diff --

Is this really needed?


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226681741
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -68,18 +70,21 @@
 public class ClientCnxnSocketNetty extends ClientCnxnSocket {
 private static final Logger LOG = 
LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
 
-ChannelFactory channelFactory = new NioClientSocketChannelFactory(
-Executors.newCachedThreadPool(), 
Executors.newCachedThreadPool());
-Channel channel;
-CountDownLatch firstConnect;
-ChannelFuture connectFuture;
-Lock connectLock = new ReentrantLock();
-AtomicBoolean disconnected = new AtomicBoolean();
-AtomicBoolean needSasl = new AtomicBoolean();
-Semaphore waitSasl = new Semaphore(0);
+private final EventLoopGroup eventLoopGroup;
+private Channel channel;
+private CountDownLatch firstConnect;
+private ChannelFuture connectFuture;
+private final Lock connectLock = new ReentrantLock();
+private final AtomicBoolean disconnected = new AtomicBoolean();
+private final AtomicBoolean needSasl = new AtomicBoolean();
+private final Semaphore waitSasl = new Semaphore(0);
+
+private static final AtomicReference TEST_ALLOCATOR =
+new AtomicReference<>(null);
 
 ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
 this.clientConfig = clientConfig;
+eventLoopGroup = new NioEventLoopGroup(0, 
Executors.newCachedThreadPool());
--- End diff --

Let's move to Epoll.
It can be a followup change (I can send of you don't have already it on 
your stack of changes)


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226690548
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -335,29 +260,34 @@ public void operationComplete(ChannelFuture future)
 CnxnChannelHandler channelHandler = new CnxnChannelHandler();
 
 NettyServerCnxnFactory() {
-bootstrap = new ServerBootstrap(
-new NioServerSocketChannelFactory(
-Executors.newCachedThreadPool(),
-Executors.newCachedThreadPool()));
-// parent channel
-bootstrap.setOption("reuseAddress", true);
-// child channels
-bootstrap.setOption("child.tcpNoDelay", true);
-/* set socket linger to off, so that socket close does not block */
-bootstrap.setOption("child.soLinger", -1);
-bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-@Override
-public ChannelPipeline getPipeline() throws Exception {
-ChannelPipeline p = Channels.pipeline();
-if (secure) {
-initSSL(p);
-}
-p.addLast("servercnxnfactory", channelHandler);
-
-return p;
-}
-});
 x509Util = new ClientX509Util();
+
+EventLoopGroup bossGroup = new NioEventLoopGroup(0, 
Executors.newCachedThreadPool());
--- End diff --

Consider EPoll


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226682998
  
--- Diff: 
zookeeper-common/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
@@ -103,71 +108,95 @@
 boolean isConnected() {
 // Assuming that isConnected() is only used to initiate connection,
 // not used by some other connection status judgement.
-return channel != null;
+connectLock.lock();
+try {
+return connectFuture != null || channel != null;
+} finally {
+connectLock.unlock();
+}
 }
 
 @Override
 void connect(InetSocketAddress addr) throws IOException {
 firstConnect = new CountDownLatch(1);
 
-ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-
-bootstrap.setPipelineFactory(new 
ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
-bootstrap.setOption("soLinger", -1);
-bootstrap.setOption("tcpNoDelay", true);
-
-connectFuture = bootstrap.connect(addr);
-connectFuture.addListener(new ChannelFutureListener() {
-@Override
-public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
-// this lock guarantees that channel won't be assgined 
after cleanup().
-connectLock.lock();
-try {
-if (!channelFuture.isSuccess() || connectFuture == 
null) {
-LOG.info("future isn't success, cause: {}", 
channelFuture.getCause());
-return;
-}
-// setup channel, variables, connection, etc.
-channel = channelFuture.getChannel();
-
-disconnected.set(false);
-initialized = false;
-lenBuffer.clear();
-incomingBuffer = lenBuffer;
-
-sendThread.primeConnection();
-updateNow();
-updateLastSendAndHeard();
-
-if (sendThread.tunnelAuthInProgress()) {
-waitSasl.drainPermits();
-needSasl.set(true);
-sendPrimePacket();
-} else {
-needSasl.set(false);
-}
+Bootstrap bootstrap = new Bootstrap();
+bootstrap.group(Objects.requireNonNull(eventLoopGroup))
+.channel(NioSocketChannel.class)
+.handler(new ZKClientPipelineFactory(addr.getHostString(), 
addr.getPort()))
+.option(ChannelOption.SO_LINGER, -1)
+.option(ChannelOption.TCP_NODELAY, true);
+ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+if (testAllocator != null) {
+bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
+}
+bootstrap.validate();
+
+connectLock.lock();
+try {
+connectFuture = bootstrap.connect(addr);
+connectFuture.addListener(new ChannelFutureListener() {
+@Override
+public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
+// this lock guarantees that channel won't be assigned 
after cleanup().
+connectLock.lock();
+try {
+if (!channelFuture.isSuccess()) {
+LOG.info("future isn't success, cause:", 
channelFuture.cause());
+return;
+} else if (connectFuture == null) {
+LOG.info("connect attempt cancelled");
+// If the connect attempt was cancelled but 
succeeded
+// anyway, make sure to close the channel, 
otherwise
+// we may leak a file descriptor.
+channelFuture.channel().close();
--- End diff --

Can this turn into an NPE? As channel() may return null. 


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-19 Thread eolivelli
Github user eolivelli commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r226689302
  
--- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
 ---
@@ -116,170 +115,94 @@ public void channelConnected(ChannelHandlerContext 
ctx,
 
 NettyServerCnxn cnxn = new NettyServerCnxn(channel,
 zkServer, NettyServerCnxnFactory.this);
-ctx.setAttachment(cnxn);
+ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
 
 if (secure) {
-SslHandler sslHandler = 
ctx.getPipeline().get(SslHandler.class);
-ChannelFuture handshakeFuture = sslHandler.handshake();
+SslHandler sslHandler = 
ctx.pipeline().get(SslHandler.class);
+Future handshakeFuture = 
sslHandler.handshakeFuture();
 handshakeFuture.addListener(new 
CertificateVerifier(sslHandler, cnxn));
 } else {
-allChannels.add(ctx.getChannel());
+allChannels.add(ctx.channel());
 addCnxn(cnxn);
 }
 }
 
 @Override
-public void channelDisconnected(ChannelHandlerContext ctx,
-ChannelStateEvent e) throws Exception
-{
-if (LOG.isTraceEnabled()) {
-LOG.trace("Channel disconnected " + e);
-}
-NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
+LOG.trace("Channel inactive {}", ctx.channel());
--- End diff --

 isTraceEnabled is missing here?


---


[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-15 Thread dain
Github user dain commented on a diff in the pull request:

https://github.com/apache/zookeeper/pull/669#discussion_r225332620
  
--- Diff: 
zookeeper-common/src/test/java/org/apache/zookeeper/common/TestByteBufAllocator.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is a custom ByteBufAllocator that tracks outstanding allocations 
and
+ * crashes the program if any of them are leaked.
+ *
+ * Never use this class in production, it will cause your server to run out
+ * of memory! This is because it holds strong references to all allocated
+ * buffers and doesn't release them until checkForLeaks() is called at the
+ * end of a unit test.
+ *
+ * Note: the original code was copied from 
https://github.com/airlift/drift,
+ * with the permission and encouragement of airlift's author (dain). 
Airlift
+ * uses the same apache 2.0 license as Zookeeper so this should be ok.
+ *
+ * However, the code was modified to take advantage of Netty's built-in
+ * leak tracking and make a best effort to print details about buffer 
leaks.
+ */
+public class TestByteBufAllocator extends PooledByteBufAllocator {
+private static AtomicReference INSTANCE =
+new AtomicReference<>(null);
+
+/**
+ * Get the singleton testing allocator.
+ * @return the singleton allocator, creating it if one does not exist.
+ */
+public static TestByteBufAllocator getInstance() {
+TestByteBufAllocator result = INSTANCE.get();
+if (result == null) {
+// Note: the leak detector level never gets reset after this,
+// but that's probably ok since this is only used by test code.
+
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+INSTANCE.compareAndSet(null, new TestByteBufAllocator());
+result = INSTANCE.get();
+}
+return result;
+}
+
+/**
+ * Destroys the singleton testing allocator and throws an error if any 
of the
+ * buffers allocated by it have been leaked. Attempts to print leak 
details to
+ * standard error before throwing, by using netty's built-in leak 
tracking.
+ * Note that this might not always work, since it only triggers when a 
buffer
+ * is garbage-collected and calling System.gc() does not guarantee 
that a buffer
+ * will actually be GC'ed.
+ *
+ * This should be called at the end of a unit test's tearDown() method.
+ */
+public static void checkForLeaks() {
+TestByteBufAllocator result = INSTANCE.getAndSet(null);
+if (result != null) {
+result.checkInstanceForLeaks();
+}
+
+}
+
+private final List trackedBuffers = new ArrayList<>();
+
+public TestByteBufAllocator()
+{
+super(false);
+}
+
+@Override
+protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
+{
+return track(super.newHeapBuffer(initialCapacity, maxCapacity));
+}
+
+@Override
+protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity)
+{
+return track(super.newDirectBuffer(initialCapacity, maxCapacity));
+}
+
+@Override
+public CompositeByteBuf compositeHeapBuffer(int maxNumComponents)
+{
+return track(super.compositeHeapBuffer(maxNumComponents));
+}
+
+@Override
+public CompositeByteBuf compositeDirectBuffer(int maxNumComponents)
+{
+

[GitHub] zookeeper pull request #669: ZOOKEEPER-3152: Port ZK netty stack to netty4

2018-10-15 Thread ivmaykov
GitHub user ivmaykov opened a pull request:

https://github.com/apache/zookeeper/pull/669

ZOOKEEPER-3152: Port ZK netty stack to netty4

Summary: Ported the client connection netty stack from netty3 to netty4. 
This includes both the server side (NettyServerCnxn and friends) and the client 
side (ClientCnxnSocketNetty).

Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus 
manual testing on a regional ensemble.

FB Reviewers: nixon

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ivmaykov/zookeeper ZOOKEEPER-3152

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/zookeeper/pull/669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #669


commit 34c3e275d012bd14c243633a638c85f1ca4a36c4
Author: Ilya Maykov 
Date:   2018-08-31T23:26:55Z

port ZK netty stack from netty3 to netty4

Summary:
Ported the client connection netty stack from netty3 to netty4. This 
includes both the server side
(NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).

Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus 
manual testing on a regional ensemble.

Reviewers: nixon, nwolchko, nedelchev

Subscribers:

Differential Revision: https://phabricator.intern.facebook.com/D9646262

Tasks:

Tags:

Blame Revision:




---