HBASE-16654 Better handle channelInactive and close for netty rpc client
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5568929d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5568929d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5568929d Branch: refs/heads/hbase-14439 Commit: 5568929dd2bfc20c1aa15d37d318459888cbd32a Parents: c67983e Author: zhangduo <zhang...@apache.org> Authored: Mon Sep 19 20:52:46 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Sep 21 22:36:47 2016 +0800 ---------------------------------------------------------------------- .../hbase/ipc/BufferCallBeforeInitHandler.java | 6 +- .../hadoop/hbase/ipc/NettyRpcConnection.java | 6 +- .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 9 +- .../hbase/security/AsyncHBaseSaslRpcClient.java | 58 -------- .../AsyncHBaseSaslRpcClientHandler.java | 135 ------------------ .../hbase/security/NettyHBaseSaslRpcClient.java | 58 ++++++++ .../NettyHBaseSaslRpcClientHandler.java | 142 +++++++++++++++++++ .../hbase/security/SaslUnwrapHandler.java | 1 + .../hadoop/hbase/security/SaslWrapHandler.java | 46 +++--- 9 files changed, 241 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java index 573ddd5..c628c31 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java @@ -62,13 +62,16 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler { private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH, null); - private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>(); + private final Map<Integer, Call> id2Call = new HashMap<>(); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg instanceof Call) { Call call = (Call) msg; id2Call.put(call.id, call); + // The call is already in track so here we set the write operation as success. + // We will fail the call directly if we can not write it out. + promise.trySuccess(); } else { ctx.write(msg, promise); } @@ -99,5 +102,4 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler { ctx.fireUserEventTriggered(evt); } } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 559b7f9..8a85580 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.security.AsyncHBaseSaslRpcClientHandler; +import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.SaslChallengeDecoder; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; import org.apache.hadoop.hbase.util.Threads; @@ -190,7 +190,7 @@ class NettyRpcConnection extends RpcConnection { Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); ChannelHandler saslHandler; try { - saslHandler = new AsyncHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token, + saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token, serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get( "hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase())); } catch (IOException e) { @@ -205,7 +205,7 @@ class NettyRpcConnection extends RpcConnection { if (future.isSuccess()) { ChannelPipeline p = ch.pipeline(); p.remove(SaslChallengeDecoder.class); - p.remove(AsyncHBaseSaslRpcClientHandler.class); + p.remove(NettyHBaseSaslRpcClientHandler.class); established(ch); } else { final Throwable error = future.cause(); http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index 1cd89d8..5faaede 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -204,13 +204,18 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - cleanupCalls(ctx, new IOException("Connection closed")); + if (!id2Call.isEmpty()) { + cleanupCalls(ctx, new IOException("Connection closed")); + } conn.shutdown(); + ctx.fireChannelInactive(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cleanupCalls(ctx, IPCUtil.toIOE(cause)); + if (!id2Call.isEmpty()) { + cleanupCalls(ctx, IPCUtil.toIOE(cause)); + } conn.shutdown(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java deleted file mode 100644 index df703dc..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.security; - -import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; - -import java.io.IOException; - -import javax.security.sasl.Sasl; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; - -/** - * Implement SASL logic for async rpc client. - */ -@InterfaceAudience.Private -public class AsyncHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { - private static final Log LOG = LogFactory.getLog(AsyncHBaseSaslRpcClient.class); - - public AsyncHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token, - String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException { - super(method, token, serverPrincipal, fallbackAllowed, rpcProtection); - } - - public void setupSaslHandler(ChannelPipeline p) { - String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client context established. Negotiated QoP: " + qop); - } - if (qop == null || "auth".equalsIgnoreCase(qop)) { - return; - } - // add wrap and unwrap handlers to pipeline. - p.addFirst(new SaslWrapHandler(saslClient), - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), - new SaslUnwrapHandler(saslClient)); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java deleted file mode 100644 index bccfa30..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.security; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.util.concurrent.Promise; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.FallbackDisallowedException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; - -/** - * Implement SASL logic for async rpc client. - */ -@InterfaceAudience.Private -public class AsyncHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> { - - private static final Log LOG = LogFactory.getLog(AsyncHBaseSaslRpcClientHandler.class); - - private final Promise<Boolean> saslPromise; - - private final UserGroupInformation ugi; - - private final AsyncHBaseSaslRpcClient saslRpcClient; - - /** - * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to - * simple. - */ - public AsyncHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi, - AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal, - boolean fallbackAllowed, String rpcProtection) throws IOException { - this.saslPromise = saslPromise; - this.ugi = ugi; - this.saslRpcClient = new AsyncHBaseSaslRpcClient(method, token, serverPrincipal, - fallbackAllowed, rpcProtection); - } - - private void writeResponse(ChannelHandlerContext ctx, byte[] response) { - if (LOG.isDebugEnabled()) { - LOG.debug("Will send token of size " + response.length + " from initSASLContext."); - } - ctx.writeAndFlush( - ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response)); - } - - private void tryComplete(ChannelHandlerContext ctx) { - if (!saslRpcClient.isComplete()) { - return; - } - saslRpcClient.setupSaslHandler(ctx.pipeline()); - saslPromise.setSuccess(true); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - try { - byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { - - @Override - public byte[] run() throws Exception { - return saslRpcClient.getInitialResponse(); - } - }); - if (initialResponse != null) { - writeResponse(ctx, initialResponse); - } - tryComplete(ctx); - } catch (Exception e) { - // the exception thrown by handlerAdded will not be passed to the exceptionCaught below - // because netty will remove a handler if handlerAdded throws an exception. - exceptionCaught(ctx, e); - } - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - int len = msg.readInt(); - if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { - saslRpcClient.dispose(); - if (saslRpcClient.fallbackAllowed) { - saslPromise.setSuccess(false); - } else { - saslPromise.setFailure(new FallbackDisallowedException()); - } - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Will read input token of size " + len + " for processing by initSASLContext"); - } - final byte[] challenge = new byte[len]; - msg.readBytes(challenge); - byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { - - @Override - public byte[] run() throws Exception { - return saslRpcClient.evaluateChallenge(challenge); - } - }); - if (response != null) { - writeResponse(ctx, response); - } - tryComplete(ctx); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - saslRpcClient.dispose(); - saslPromise.setFailure(cause); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java new file mode 100644 index 0000000..f624608 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import java.io.IOException; + +import javax.security.sasl.Sasl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +/** + * Implement SASL logic for netty rpc client. + */ +@InterfaceAudience.Private +public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { + private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClient.class); + + public NettyHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token, + String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException { + super(method, token, serverPrincipal, fallbackAllowed, rpcProtection); + } + + public void setupSaslHandler(ChannelPipeline p) { + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client context established. Negotiated QoP: " + qop); + } + if (qop == null || "auth".equalsIgnoreCase(qop)) { + return; + } + // add wrap and unwrap handlers to pipeline. + p.addFirst(new SaslWrapHandler(saslClient), + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), + new SaslUnwrapHandler(saslClient)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java new file mode 100644 index 0000000..50609b4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.FallbackDisallowedException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +/** + * Implement SASL logic for netty rpc client. + */ +@InterfaceAudience.Private +public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClientHandler.class); + + private final Promise<Boolean> saslPromise; + + private final UserGroupInformation ugi; + + private final NettyHBaseSaslRpcClient saslRpcClient; + + /** + * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to + * simple. + */ + public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi, + AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal, + boolean fallbackAllowed, String rpcProtection) throws IOException { + this.saslPromise = saslPromise; + this.ugi = ugi; + this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, serverPrincipal, + fallbackAllowed, rpcProtection); + } + + private void writeResponse(ChannelHandlerContext ctx, byte[] response) { + if (LOG.isDebugEnabled()) { + LOG.debug("Will send token of size " + response.length + " from initSASLContext."); + } + ctx.writeAndFlush( + ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response)); + } + + private void tryComplete(ChannelHandlerContext ctx) { + if (!saslRpcClient.isComplete()) { + return; + } + saslRpcClient.setupSaslHandler(ctx.pipeline()); + saslPromise.setSuccess(true); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + try { + byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { + + @Override + public byte[] run() throws Exception { + return saslRpcClient.getInitialResponse(); + } + }); + if (initialResponse != null) { + writeResponse(ctx, initialResponse); + } + tryComplete(ctx); + } catch (Exception e) { + // the exception thrown by handlerAdded will not be passed to the exceptionCaught below + // because netty will remove a handler if handlerAdded throws an exception. + exceptionCaught(ctx, e); + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + int len = msg.readInt(); + if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) { + saslRpcClient.dispose(); + if (saslRpcClient.fallbackAllowed) { + saslPromise.trySuccess(false); + } else { + saslPromise.tryFailure(new FallbackDisallowedException()); + } + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Will read input token of size " + len + " for processing by initSASLContext"); + } + final byte[] challenge = new byte[len]; + msg.readBytes(challenge); + byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() { + + @Override + public byte[] run() throws Exception { + return saslRpcClient.evaluateChallenge(challenge); + } + }); + if (response != null) { + writeResponse(ctx, response); + } + tryComplete(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + saslRpcClient.dispose(); + saslPromise.tryFailure(new IOException("Connection closed")); + ctx.fireChannelInactive(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + saslRpcClient.dispose(); + saslPromise.tryFailure(cause); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java index c2faf91..e631478 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java @@ -42,6 +42,7 @@ public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { SaslUtil.safeDispose(saslClient); + ctx.fireChannelInactive(); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java index fefb4f8..14ecf2e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java @@ -26,6 +26,8 @@ import io.netty.channel.CoalescingBufferQueue; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.PromiseCombiner; +import java.io.IOException; + import javax.security.sasl.SaslClient; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -40,6 +42,10 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { private CoalescingBufferQueue queue; + public SaslWrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { queue = new CoalescingBufferQueue(ctx.channel()); @@ -55,29 +61,26 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { } } - public SaslWrapHandler(SaslClient saslClient) { - this.saslClient = saslClient; - } - @Override public void flush(ChannelHandlerContext ctx) throws Exception { + if (queue.isEmpty()) { + return; + } ByteBuf buf = null; try { - if (!queue.isEmpty()) { - ChannelPromise promise = ctx.newPromise(); - int readableBytes = queue.readableBytes(); - buf = queue.remove(readableBytes, promise); - byte[] bytes = new byte[readableBytes]; - buf.readBytes(bytes); - byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); - ChannelPromise lenPromise = ctx.newPromise(); - ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); - ChannelPromise contentPromise = ctx.newPromise(); - ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); - PromiseCombiner combiner = new PromiseCombiner(); - combiner.addAll(lenPromise, contentPromise); - combiner.finish(promise); - } + ChannelPromise promise = ctx.newPromise(); + int readableBytes = queue.readableBytes(); + buf = queue.remove(readableBytes, promise); + byte[] bytes = new byte[readableBytes]; + buf.readBytes(bytes); + byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length); + ChannelPromise lenPromise = ctx.newPromise(); + ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise); + ChannelPromise contentPromise = ctx.newPromise(); + ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise); + PromiseCombiner combiner = new PromiseCombiner(); + combiner.addAll(lenPromise, contentPromise); + combiner.finish(promise); ctx.flush(); } finally { if (buf != null) { @@ -88,6 +91,9 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - queue.releaseAndFailAll(new Throwable("Closed")); + if (!queue.isEmpty()) { + queue.releaseAndFailAll(new IOException("Connection closed")); + } + ctx.close(promise); } }