HBASE-16335 RpcClient under heavy load leaks some netty bytebuf (Ram)

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c5b8aaba
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c5b8aaba
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c5b8aaba

Branch: refs/heads/hbase-12439
Commit: c5b8aababe18f65f5db979128a62d8a0686b9dc5
Parents: 6eb6225
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Authored: Mon Sep 19 16:12:15 2016 +0530
Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Committed: Mon Sep 19 16:12:15 2016 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AbstractRpcClient.java     |  4 ++
 .../hadoop/hbase/ipc/BlockingRpcConnection.java |  5 +++
 .../hadoop/hbase/ipc/NettyRpcConnection.java    | 11 +++++
 .../apache/hadoop/hbase/ipc/RpcConnection.java  |  5 +++
 .../hadoop/hbase/security/SaslWrapHandler.java  | 43 +++++++++++++-------
 5 files changed, 53 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 401a240..990ffe0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -215,6 +215,7 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
         if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
           LOG.info("Cleanup idle connection to " + conn.remoteId().address);
           connections.removeValue(conn.remoteId(), conn);
+          conn.cleanupConnection();
         }
       }
     }
@@ -472,6 +473,9 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
       conn.shutdown();
     }
     closeInternal();
+    for (T conn : connToClose) {
+      conn.cleanupConnection();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index c8b366d..528b726 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -685,6 +685,11 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
   }
 
   @Override
+  public void cleanupConnection() {
+    // do nothing
+  }
+
+  @Override
   public synchronized void sendRequest(final Call call, HBaseRpcController 
pcrc)
       throws IOException {
     pcrc.notifyOnCancel(new RpcCallback<Object>() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 5f22dfd..559b7f9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -36,6 +36,7 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
@@ -119,6 +120,16 @@ class NettyRpcConnection extends RpcConnection {
     shutdown0();
   }
 
+  @Override
+  public synchronized void cleanupConnection() {
+    if (connectionHeaderPreamble != null) {
+      ReferenceCountUtil.safeRelease(connectionHeaderPreamble);
+    }
+    if (connectionHeaderWithLength != null) {
+      ReferenceCountUtil.safeRelease(connectionHeaderWithLength);
+    }
+  }
+
   private void established(Channel ch) {
     ch.write(connectionHeaderWithLength.retainedDuplicate());
     ChannelPipeline p = ch.pipeline();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index 8118b20..5e9e97e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -252,4 +252,9 @@ abstract class RpcConnection {
   public abstract void shutdown();
 
   public abstract void sendRequest(Call call, HBaseRpcController hrc) throws 
IOException;
+
+  /**
+   * Does the clean up work after the connection is removed from the 
connection pool
+   */
+  public abstract void cleanupConnection();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c5b8aaba/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
index ddb4ae9..fefb4f8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.CoalescingBufferQueue;
+import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.PromiseCombiner;
 
 import javax.security.sasl.SaslClient;
@@ -60,21 +61,33 @@ public class SaslWrapHandler extends 
ChannelOutboundHandlerAdapter {
 
   @Override
   public void flush(ChannelHandlerContext ctx) throws Exception {
-    if (!queue.isEmpty()) {
-      ChannelPromise promise = ctx.newPromise();
-      int readableBytes = queue.readableBytes();
-      ByteBuf buf = queue.remove(readableBytes, promise);
-      byte[] bytes = new byte[readableBytes];
-      buf.readBytes(bytes);
-      byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
-      ChannelPromise lenPromise = ctx.newPromise();
-      ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), 
lenPromise);
-      ChannelPromise contentPromise = ctx.newPromise();
-      ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
-      PromiseCombiner combiner = new PromiseCombiner();
-      combiner.addAll(lenPromise, contentPromise);
-      combiner.finish(promise);
+    ByteBuf buf = null;
+    try {
+      if (!queue.isEmpty()) {
+        ChannelPromise promise = ctx.newPromise();
+        int readableBytes = queue.readableBytes();
+        buf = queue.remove(readableBytes, promise);
+        byte[] bytes = new byte[readableBytes];
+        buf.readBytes(bytes);
+        byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
+        ChannelPromise lenPromise = ctx.newPromise();
+        ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), 
lenPromise);
+        ChannelPromise contentPromise = ctx.newPromise();
+        ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
+        PromiseCombiner combiner = new PromiseCombiner();
+        combiner.addAll(lenPromise, contentPromise);
+        combiner.finish(promise);
+      }
+      ctx.flush();
+    } finally {
+      if (buf != null) {
+        ReferenceCountUtil.safeRelease(buf);
+      }
     }
-    ctx.flush();
+  }
+
+  @Override
+  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws 
Exception {
+    queue.releaseAndFailAll(new Throwable("Closed"));
   }
 }

Reply via email to