http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java deleted file mode 100644 index 5f4d2f4..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ /dev/null @@ -1,510 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcChannel; -import com.google.protobuf.RpcController; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.epoll.EpollSocketChannel; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.JVM; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PoolMap; -import org.apache.hadoop.hbase.util.Threads; - -/** - * Netty client for the requests and responses - */ -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class AsyncRpcClient extends AbstractRpcClient { - - private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class); - - public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max"; - public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport"; - public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup"; - - private static final HashedWheelTimer WHEEL_TIMER = - new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"), - 100, TimeUnit.MILLISECONDS); - - private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER = - new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - //empty initializer - } - }; - - protected final AtomicInteger callIdCnt = new AtomicInteger(); - - private final PoolMap<Integer, AsyncRpcChannel> connections; - - final FailedServers failedServers; - - @VisibleForTesting - final Bootstrap bootstrap; - - private final boolean useGlobalEventLoopGroup; - - @VisibleForTesting - static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP; - - synchronized static Pair<EventLoopGroup, Class<? extends Channel>> - getGlobalEventLoopGroup(Configuration conf) { - if (GLOBAL_EVENT_LOOP_GROUP == null) { - GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf); - if (LOG.isDebugEnabled()) { - LOG.debug("Create global event loop group " - + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName()); - } - } - return GLOBAL_EVENT_LOOP_GROUP; - } - - private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup( - Configuration conf) { - // Max amount of threads to use. 0 lets Netty decide based on amount of cores - int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0); - - // Config to enable native transport. Does not seem to be stable at time of implementation - // although it is not extensively tested. - boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false); - - // Use the faster native epoll transport mechanism on linux if enabled - if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads); - } - return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads, - Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads); - } - return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads, - Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class); - } - } - - /** - * Constructor for tests - * - * @param configuration to HBase - * @param clusterId for the cluster - * @param localAddress local address to connect to - * @param metrics the connection metrics - * @param channelInitializer for custom channel handlers - */ - protected AsyncRpcClient(Configuration configuration, String clusterId, - SocketAddress localAddress, MetricsConnection metrics, - ChannelInitializer<SocketChannel> channelInitializer) { - super(configuration, clusterId, localAddress, metrics); - - if (LOG.isDebugEnabled()) { - LOG.debug("Starting async Hbase RPC client"); - } - - Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass; - this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true); - if (useGlobalEventLoopGroup) { - eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration); - } else { - eventLoopGroupAndChannelClass = createEventLoopGroup(configuration); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group " - + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName()); - } - - this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration)); - this.failedServers = new FailedServers(configuration); - - int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - - // Configure the default bootstrap. - this.bootstrap = new Bootstrap(); - bootstrap.group(eventLoopGroupAndChannelClass.getFirst()) - .channel(eventLoopGroupAndChannelClass.getSecond()) - .option(ChannelOption.TCP_NODELAY, tcpNoDelay) - .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout); - if (channelInitializer == null) { - channelInitializer = DEFAULT_CHANNEL_INITIALIZER; - } - bootstrap.handler(channelInitializer); - if (localAddress != null) { - bootstrap.localAddress(localAddress); - } - } - - /** Used in test only. */ - AsyncRpcClient(Configuration configuration) { - this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null); - } - - /** Used in test only. */ - AsyncRpcClient(Configuration configuration, - ChannelInitializer<SocketChannel> channelInitializer) { - this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer); - } - - /** - * Constructor - * - * @param configuration to HBase - * @param clusterId for the cluster - * @param localAddress local address to connect to - * @param metrics the connection metrics - */ - public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, - MetricsConnection metrics) { - this(configuration, clusterId, localAddress, metrics, null); - } - - /** - * Make a call, passing <code>param</code>, to the IPC server running at - * <code>address</code> which is servicing the <code>protocol</code> protocol, - * with the <code>ticket</code> credentials, returning the value. - * Throws exceptions if there are network problems or if the remote code - * threw an exception. - * - * @param ticket Be careful which ticket you pass. A new user will mean a new Connection. - * {@link org.apache.hadoop.hbase.security.UserProvider#getCurrent()} makes a new - * instance of User each time so will be a new Connection each time. - * @return A pair with the Message response and the Cell data (if any). - * @throws InterruptedException if call is interrupted - * @throws java.io.IOException if a connection failure is encountered - */ - @Override - protected Pair<Message, CellScanner> call(HBaseRpcController pcrc, - Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, - InetSocketAddress addr, MetricsConnection.CallStats callStats) - throws IOException, InterruptedException { - if (pcrc == null) { - pcrc = new HBaseRpcControllerImpl(); - } - final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); - - final Promise<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType, - getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), - pcrc.getPriority()); - - pcrc.notifyOnCancel(new RpcCallback<Object>() { - @Override - public void run(Object parameter) { - // Will automatically fail the promise with CancellationException - promise.cancel(true); - } - }); - - long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0; - try { - Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get(); - return new Pair<>(response, pcrc.cellScanner()); - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - throw IPCUtil.wrapException(addr, (Exception) e.getCause()); - } - } catch (TimeoutException e) { - CallTimeoutException cte = new CallTimeoutException(promise.toString()); - throw IPCUtil.wrapException(addr, cte); - } - } - - private MessageConverter<Message, Message> getMessageConverterWithRpcController( - final HBaseRpcController pcrc) { - return new - MessageConverter<Message, Message>() { - @Override - public Message convert(Message msg, CellScanner cellScanner) { - pcrc.setCellScanner(cellScanner); - return msg; - } - }; - } - - /** - * Call method async - */ - private void callMethod(final Descriptors.MethodDescriptor md, - final HBaseRpcController pcrc, final Message param, Message returnType, User ticket, - InetSocketAddress addr, final RpcCallback<Message> done) { - final AsyncRpcChannel connection; - try { - connection = createRpcChannel(md.getService().getName(), addr, ticket); - - FutureListener<Message> listener = - new FutureListener<Message>() { - @Override - public void operationComplete(Future<Message> future) throws Exception { - if (!future.isSuccess()) { - Throwable cause = future.cause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - } else { - pcrc.setFailed(new IOException(cause)); - } - } else { - try { - done.run(future.get()); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - pcrc.setFailed((IOException) cause); - } else { - pcrc.setFailed(new IOException(cause)); - } - } catch (InterruptedException e) { - pcrc.setFailed(new IOException(e)); - } - } - } - }; - connection.callMethod(md, param, pcrc.cellScanner(), returnType, - getMessageConverterWithRpcController(pcrc), null, - pcrc.getCallTimeout(), pcrc.getPriority()) - .addListener(listener); - } catch (StoppedRpcClientException|FailedServerException e) { - pcrc.setFailed(e); - } - } - - private boolean closed = false; - - /** - * Close netty - */ - public void close() { - if (LOG.isDebugEnabled()) { - LOG.debug("Stopping async HBase RPC client"); - } - - synchronized (connections) { - if (closed) { - return; - } - closed = true; - for (AsyncRpcChannel conn : connections.values()) { - conn.close(null); - } - } - // do not close global EventLoopGroup. - if (!useGlobalEventLoopGroup) { - bootstrap.config().group().shutdownGracefully(); - } - } - - /** - * Create a cell scanner - * - * @param cellBlock to create scanner for - * @return CellScanner - * @throws java.io.IOException on error on creation cell scanner - */ - public CellScanner createCellScanner(byte[] cellBlock) throws IOException { - return cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); - } - - /** - * Build cell block - * - * @param cells to create block with - * @return ByteBuffer with cells - * @throws java.io.IOException if block creation fails - */ - public ByteBuffer buildCellBlock(CellScanner cells) throws IOException { - return cellBlockBuilder.buildCellBlock(this.codec, this.compressor, cells); - } - - /** - * Creates an RPC client - * - * @param serviceName name of service - * @param location to connect to - * @param ticket for current user - * @return new RpcChannel - * @throws StoppedRpcClientException when Rpc client is stopped - * @throws FailedServerException if server failed - */ - private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location, - User ticket) throws StoppedRpcClientException, FailedServerException { - // Check if server is failed - if (this.failedServers.isFailedServer(location)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not trying to connect to " + location + - " this server is in the failed servers list"); - } - throw new FailedServerException( - "This server is in the failed servers list: " + location); - } - - int hashCode = ConnectionId.hashCode(ticket,serviceName,location); - - AsyncRpcChannel rpcChannel; - synchronized (connections) { - if (closed) { - throw new StoppedRpcClientException(); - } - rpcChannel = connections.get(hashCode); - if (rpcChannel != null && !rpcChannel.isAlive()) { - LOG.debug("Removing dead channel from server="+rpcChannel.getAddress().toString()); - connections.remove(hashCode); - } - if (rpcChannel == null) { - rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location); - connections.put(hashCode, rpcChannel); - } - } - - return rpcChannel; - } - - /** - * Interrupt the connections to the given ip:port server. This should be called if the server - * is known as actually dead. This will not prevent current operation to be retried, and, - * depending on their own behavior, they may retry on the same server. This can be a feature, - * for example at startup. In any case, they're likely to get connection refused (if the - * process died) or no route to host: i.e. there next retries should be faster and with a - * safe exception. - * - * @param sn server to cancel connections for - */ - @Override - public void cancelConnections(ServerName sn) { - synchronized (connections) { - for (AsyncRpcChannel rpcChannel : connections.values()) { - if (rpcChannel.isAlive() && - rpcChannel.getAddress().getPort() == sn.getPort() && - rpcChannel.getAddress().getHostName().contentEquals(sn.getHostname())) { - LOG.info("The server on " + sn.toString() + - " is dead - stopping the connection " + rpcChannel.toString()); - rpcChannel.close(null); - } - } - } - } - - /** - * Remove connection from pool - * @param connection to remove - */ - public void removeConnection(AsyncRpcChannel connection) { - int connectionHashCode = connection.hashCode(); - synchronized (connections) { - // we use address as cache key, so we should check here to prevent removing the - // wrong connection - AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode); - if (connectionInPool != null && connectionInPool.equals(connection)) { - this.connections.remove(connectionHashCode); - } else if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x", - connection.toString(), System.identityHashCode(connection), - System.identityHashCode(connectionInPool))); - } - } - } - - @Override - public RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout) { - return new RpcChannelImplementation(this, sn, user, rpcTimeout); - } - - /** - * Blocking rpc channel that goes via hbase rpc. - */ - @VisibleForTesting - public static class RpcChannelImplementation implements RpcChannel { - private final InetSocketAddress isa; - private final AsyncRpcClient rpcClient; - private final User ticket; - private final int channelOperationTimeout; - - /** - * @param channelOperationTimeout - the default timeout when no timeout is given - */ - protected RpcChannelImplementation(final AsyncRpcClient rpcClient, - final ServerName sn, final User ticket, int channelOperationTimeout) { - this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - this.rpcClient = rpcClient; - this.ticket = ticket; - this.channelOperationTimeout = channelOperationTimeout; - } - - @Override - public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, - Message param, Message returnType, RpcCallback<Message> done) { - HBaseRpcController pcrc = - configurePayloadCarryingRpcController(controller, channelOperationTimeout); - - this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); - } - } - - /** - * Get a new timeout on this RPC client - * @param task to run at timeout - * @param delay for the timeout - * @param unit time unit for the timeout - * @return Timeout - */ - Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { - return WHEEL_TIMER.newTimeout(task, delay, unit); - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java deleted file mode 100644 index 7a2802f..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import com.google.protobuf.Message; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; - -import java.io.IOException; - -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; -import org.apache.hadoop.ipc.RemoteException; - -/** - * Handles Hbase responses - */ -@InterfaceAudience.Private -public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> { - private final AsyncRpcChannel channel; - - /** - * Constructor - * @param channel on which this response handler operates - */ - public AsyncServerResponseHandler(AsyncRpcChannel channel) { - this.channel = channel; - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf inBuffer) throws Exception { - ByteBufInputStream in = new ByteBufInputStream(inBuffer); - int totalSize = inBuffer.readableBytes(); - // Read the header - RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in); - int id = responseHeader.getCallId(); - AsyncCall call = channel.removePendingCall(id); - if (call == null) { - // So we got a response for which we have no corresponding 'call' here on the client-side. - // We probably timed out waiting, cleaned up all references, and now the server decides - // to return a response. There is nothing we can do w/ the response at this stage. Clean - // out the wire of the response so its out of the way and we can get other responses on - // this connection. - int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); - int whatIsLeftToRead = totalSize - readSoFar; - - // This is done through a Netty ByteBuf which has different behavior than InputStream. - // It does not return number of bytes read but will update pointer internally and throws an - // exception when too many bytes are to be skipped. - inBuffer.skipBytes(whatIsLeftToRead); - return; - } - - if (responseHeader.hasException()) { - RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException(); - RemoteException re = createRemoteException(exceptionResponse); - if (exceptionResponse.getExceptionClassName() - .equals(FatalConnectionException.class.getName())) { - channel.close(re); - } else { - call.setFailed(re); - } - } else { - Message value = null; - // Call may be null because it may have timedout and been cleaned up on this side already - if (call.responseDefaultType != null) { - Message.Builder builder = call.responseDefaultType.newBuilderForType(); - ProtobufUtil.mergeDelimitedFrom(builder, in); - value = builder.build(); - } - CellScanner cellBlockScanner = null; - if (responseHeader.hasCellBlockMeta()) { - int size = responseHeader.getCellBlockMeta().getLength(); - byte[] cellBlock = new byte[size]; - inBuffer.readBytes(cellBlock, 0, cellBlock.length); - cellBlockScanner = channel.client.createCellScanner(cellBlock); - } - call.setSuccess(value, cellBlockScanner); - call.callStats.setResponseSizeBytes(totalSize); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - channel.close(cause); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - channel.close(new IOException("connection closed")); - } - - /** - * @param e Proto exception - * @return RemoteException made from passed <code>e</code> - */ - private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) { - String innerExceptionClassName = e.getExceptionClassName(); - boolean doNotRetry = e.getDoNotRetry(); - return e.hasHostname() ? - // If a hostname then add it to the RemoteWithExtrasException - new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), - e.getPort(), doNotRetry) - : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java index 0475e58..523ca55 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java @@ -46,7 +46,7 @@ public class BlockingRpcCallback<R> implements RpcCallback<R> { synchronized (this) { result = parameter; resultSet = true; - this.notify(); + this.notifyAll(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java new file mode 100644 index 0000000..d27602e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.net.SocketAddress; + +import javax.net.SocketFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.net.NetUtils; + +/** + * Does RPC against a cluster. Manages connections per regionserver in the cluster. + * <p> + * See HBaseServer + */ +@InterfaceAudience.Private +public class BlockingRpcClient extends AbstractRpcClient<BlockingRpcConnection> { + + protected final SocketFactory socketFactory; // how to create sockets + + /** + * Used in test only. Construct an IPC client for the cluster {@code clusterId} with the default + * SocketFactory + */ + @VisibleForTesting + BlockingRpcClient(Configuration conf) { + this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null); + } + + /** + * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory This + * method is called with reflection by the RpcClientFactory to create an instance + * @param conf configuration + * @param clusterId the cluster id + * @param localAddr client socket bind address. + * @param metrics the connection metrics + */ + public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr, + MetricsConnection metrics) { + super(conf, clusterId, localAddr, metrics); + this.socketFactory = NetUtils.getDefaultSocketFactory(conf); + } + + /** + * Creates a connection. Can be overridden by a subclass for testing. + * @param remoteId - the ConnectionId to use for the connection creation. + */ + protected BlockingRpcConnection createConnection(ConnectionId remoteId) throws IOException { + return new BlockingRpcConnection(this, remoteId); + } + + @Override + protected void closeInternal() { + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java new file mode 100644 index 0000000..c8b366d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -0,0 +1,725 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader; +import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException; +import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited; +import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException; +import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; +import static org.apache.hadoop.hbase.ipc.IPCUtil.write; + +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; +import com.google.protobuf.RpcCallback; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayDeque; +import java.util.Locale; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; + +import javax.security.sasl.SaslException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; +import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ExceptionUtil; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +/** + * Thread that reads responses and notifies callers. Each connection owns a socket connected to a + * remote address. Calls are multiplexed through this socket: responses may be delivered out of + * order. + */ +@InterfaceAudience.Private +class BlockingRpcConnection extends RpcConnection implements Runnable { + + private static final Log LOG = LogFactory.getLog(BlockingRpcConnection.class); + + private final BlockingRpcClient rpcClient; + + private final String threadName; + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "We are always under lock actually") + private Thread thread; + + // connected socket. protected for writing UT. + protected Socket socket = null; + private DataInputStream in; + private DataOutputStream out; + + private HBaseSaslRpcClient saslRpcClient; + + // currently active calls + private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>(); + + private final CallSender callSender; + + private boolean closed = false; + + private byte[] connectionHeaderPreamble; + + private byte[] connectionHeaderWithLength; + + /** + * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a + * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to + * use a different thread for writing. This way, on interruptions, we either cancel the writes or + * ignore the answer if the write is already done, but we don't stop the write in the middle. This + * adds a thread per region server in the client, so it's kept as an option. + * <p> + * The implementation is simple: the client threads adds their call to the queue, and then wait + * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On + * interruption, the client cancels its call. The CallSender checks that the call has not been + * canceled before writing it. + * </p> + * When the connection closes, all the calls not yet sent are dismissed. The client thread is + * notified with an appropriate exception, as if the call was already sent but the answer not yet + * received. + * </p> + */ + private class CallSender extends Thread { + + private final Queue<Call> callsToWrite; + + private final int maxQueueSize; + + public CallSender(String name, Configuration conf) { + int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000); + callsToWrite = new ArrayDeque<>(queueSize); + this.maxQueueSize = queueSize; + setDaemon(true); + setName(name + " - writer"); + } + + public void sendCall(final Call call) throws IOException { + if (callsToWrite.size() >= maxQueueSize) { + throw new IOException("Can't add the call " + call.id + + " to the write queue. callsToWrite.size()=" + callsToWrite.size()); + } + callsToWrite.offer(call); + BlockingRpcConnection.this.notifyAll(); + } + + public void remove(Call call) { + callsToWrite.remove(); + // By removing the call from the expected call list, we make the list smaller, but + // it means as well that we don't know how many calls we cancelled. + calls.remove(call.id); + call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime=" + + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" + + call.timeout)); + } + + /** + * Reads the call from the queue, write them on the socket. + */ + @Override + public void run() { + synchronized (BlockingRpcConnection.this) { + while (!closed) { + if (callsToWrite.isEmpty()) { + // We should use another monitor object here for better performance since the read + // thread also uses ConnectionImpl.this. But this makes the locking schema more + // complicated, can do it later as an optimization. + try { + BlockingRpcConnection.this.wait(); + } catch (InterruptedException e) { + } + // check if we need to quit, so continue the main loop instead of fallback. + continue; + } + Call call = callsToWrite.poll(); + if (call.isDone()) { + continue; + } + try { + tracedWriteRequest(call); + } catch (IOException e) { + // exception here means the call has not been added to the pendingCalls yet, so we need + // to fail it by our own. + if (LOG.isDebugEnabled()) { + LOG.debug("call write error for call #" + call.id, e); + } + call.setException(e); + closeConn(e); + } + } + } + } + + /** + * Cleans the call not yet sent when we finish. + */ + public void cleanup(IOException e) { + IOException ie = new ConnectionClosingException( + "Connection to " + remoteId.address + " is closing."); + for (Call call : callsToWrite) { + call.setException(ie); + } + callsToWrite.clear(); + } + } + + BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { + super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, + rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); + this.rpcClient = rpcClient; + if (remoteId.getAddress().isUnresolved()) { + throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName()); + } + + this.connectionHeaderPreamble = getConnectionHeaderPreamble(); + ConnectionHeader header = getConnectionHeader(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize()); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeInt(header.getSerializedSize()); + header.writeTo(dos); + assert baos.size() == 4 + header.getSerializedSize(); + this.connectionHeaderWithLength = baos.getBuffer(); + + UserGroupInformation ticket = remoteId.ticket.getUGI(); + this.threadName = "IPC Client (" + this.rpcClient.socketFactory.hashCode() + ") connection to " + + remoteId.getAddress().toString() + + ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName())); + + if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) { + callSender = new CallSender(threadName, this.rpcClient.conf); + callSender.start(); + } else { + callSender = null; + } + } + + // protected for write UT. + protected void setupConnection() throws IOException { + short ioFailures = 0; + short timeoutFailures = 0; + while (true) { + try { + this.socket = this.rpcClient.socketFactory.createSocket(); + this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay()); + this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive); + if (this.rpcClient.localAddr != null) { + this.socket.bind(this.rpcClient.localAddr); + } + NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO); + this.socket.setSoTimeout(this.rpcClient.readTO); + return; + } catch (SocketTimeoutException toe) { + /* + * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries. + */ + handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe); + } catch (IOException ie) { + handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie); + } + } + } + + /** + * Handle connection failures If the current number of retries is equal to the max number of + * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting + * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence + * the sleep is synchronized; the locks will be retained. + * @param curRetries current number of retries + * @param maxRetries max number of retries allowed + * @param ioe failure reason + * @throws IOException if max number of retries is reached + */ + private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) + throws IOException { + closeSocket(); + + // throw the exception if the maximum number of retries is reached + if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) { + throw ioe; + } + + // otherwise back off and retry + try { + Thread.sleep(this.rpcClient.failureSleep); + } catch (InterruptedException ie) { + ExceptionUtil.rethrowIfInterrupt(ie); + } + + LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " + + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s)."); + } + + /* + * wait till someone signals us to start reading RPC response or it is idle too long, it is marked + * as to be closed, or the client is marked as not running. + * @return true if it is time to read a response; false otherwise. + */ + private synchronized boolean waitForWork() { + // beware of the concurrent access to the calls list: we can add calls, but as well + // remove them. + long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose; + for (;;) { + if (thread == null) { + return false; + } + if (!calls.isEmpty()) { + return true; + } + if (EnvironmentEdgeManager.currentTime() >= waitUntil) { + closeConn( + new IOException("idle connection closed with " + calls.size() + " pending request(s)")); + return false; + } + try { + wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000)); + } catch (InterruptedException e) { + } + } + } + + @Override + public void run() { + if (LOG.isTraceEnabled()) { + LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size()); + } + while (waitForWork()) { + readResponse(); + } + if (LOG.isTraceEnabled()) { + LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size()); + } + } + + private void disposeSasl() { + if (saslRpcClient != null) { + saslRpcClient.dispose(); + saslRpcClient = null; + } + } + + private boolean setupSaslConnection(final InputStream in2, final OutputStream out2) + throws IOException { + saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, + this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection", + QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); + return saslRpcClient.saslConnect(in2, out2); + } + + /** + * If multiple clients with the same principal try to connect to the same server at the same time, + * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to + * work around this, what is done is that the client backs off randomly and tries to initiate the + * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is + * attempted. + * <p> + * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the + * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such + * cases, it is prudent to throw a runtime exception when we receive a SaslException from the + * underlying authentication implementation, so there is no retry from other high level (for eg, + * HCM or HBaseAdmin). + * </p> + */ + private void handleSaslConnectionFailure(final int currRetries, final int maxRetries, + final Exception ex, final UserGroupInformation user) + throws IOException, InterruptedException { + closeSocket(); + user.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws IOException, InterruptedException { + if (shouldAuthenticateOverKrb()) { + if (currRetries < maxRetries) { + if (LOG.isDebugEnabled()) { + LOG.debug("Exception encountered while connecting to " + "the server : " + ex); + } + // try re-login + relogin(); + disposeSasl(); + // have granularity of milliseconds + // we are sleeping with the Connection lock held but since this + // connection instance is being used for connecting to the server + // in question, it is okay + Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); + return null; + } else { + String msg = "Couldn't setup connection for " + + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; + LOG.warn(msg, ex); + throw (IOException) new IOException(msg).initCause(ex); + } + } else { + LOG.warn("Exception encountered while connecting to " + "the server : " + ex); + } + if (ex instanceof RemoteException) { + throw (RemoteException) ex; + } + if (ex instanceof SaslException) { + String msg = "SASL authentication failed." + + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'."; + LOG.fatal(msg, ex); + throw new RuntimeException(msg, ex); + } + throw new IOException(ex); + } + }); + } + + private void setupIOstreams() throws IOException { + if (socket != null) { + // The connection is already available. Perfect. + return; + } + + if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not trying to connect to " + remoteId.address + + " this server is in the failed servers list"); + } + throw new FailedServerException( + "This server is in the failed servers list: " + remoteId.address); + } + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to " + remoteId.address); + } + + short numRetries = 0; + final short MAX_RETRIES = 5; + while (true) { + setupConnection(); + InputStream inStream = NetUtils.getInputStream(socket); + // This creates a socket with a write timeout. This timeout cannot be changed. + OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); + // Write out the preamble -- MAGIC, version, and auth to use. + writeConnectionHeaderPreamble(outStream); + if (useSasl) { + final InputStream in2 = inStream; + final OutputStream out2 = outStream; + UserGroupInformation ticket = getUGI(); + boolean continueSasl; + if (ticket == null) { + throw new FatalConnectionException("ticket/user is null"); + } + try { + continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws IOException { + return setupSaslConnection(in2, out2); + } + }); + } catch (Exception ex) { + ExceptionUtil.rethrowIfInterrupt(ex); + handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, ticket); + continue; + } + if (continueSasl) { + // Sasl connect is successful. Let's set up Sasl i/o streams. + inStream = saslRpcClient.getInputStream(inStream); + outStream = saslRpcClient.getOutputStream(outStream); + } else { + // fall back to simple auth because server told us so. + // do not change authMethod and useSasl here, we should start from secure when + // reconnecting because regionserver may change its sasl config after restart. + } + } + this.in = new DataInputStream(new BufferedInputStream(inStream)); + this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + // Now write out the connection header + writeConnectionHeader(); + break; + } + } catch (Throwable t) { + closeSocket(); + IOException e = ExceptionUtil.asInterrupt(t); + if (e == null) { + this.rpcClient.failedServers.addToFailedServers(remoteId.address); + if (t instanceof LinkageError) { + // probably the hbase hadoop version does not match the running hadoop version + e = new DoNotRetryIOException(t); + } else if (t instanceof IOException) { + e = (IOException) t; + } else { + e = new IOException("Could not set up IO Streams to " + remoteId.address, t); + } + } + throw e; + } + + // start the receiver thread after the socket connection has been set up + thread = new Thread(this, threadName); + thread.setDaemon(true); + thread.start(); + } + + /** + * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>} + */ + private void writeConnectionHeaderPreamble(OutputStream out) throws IOException { + out.write(connectionHeaderPreamble); + out.flush(); + } + + /** + * Write the connection header. + */ + private void writeConnectionHeader() throws IOException { + this.out.write(connectionHeaderWithLength); + this.out.flush(); + } + + private void tracedWriteRequest(Call call) throws IOException { + try (TraceScope ignored = Trace.startSpan("RpcClientImpl.tracedWriteRequest", call.span)) { + writeRequest(call); + } + } + + /** + * Initiates a call by sending the parameter to the remote server. Note: this is not called from + * the Connection thread, but by other threads. + * @see #readResponse() + */ + private void writeRequest(Call call) throws IOException { + ByteBuffer cellBlock = this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, + this.compressor, call.cells); + CellBlockMeta cellBlockMeta; + if (cellBlock != null) { + cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build(); + } else { + cellBlockMeta = null; + } + RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); + + setupIOstreams(); + + // Now we're going to write the call. We take the lock, then check that the connection + // is still valid, and, if so we do the write to the socket. If the write fails, we don't + // know where we stand, we have to close the connection. + if (Thread.interrupted()) { + throw new InterruptedIOException(); + } + + calls.put(call.id, call); // We put first as we don't want the connection to become idle. + // from here, we do not throw any exception to upper layer as the call has been tracked in the + // pending calls map. + try { + call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); + } catch (IOException e) { + closeConn(e); + return; + } + notifyAll(); + } + + /* + * Receive a response. Because only one receiver, so no synchronization on in. + */ + private void readResponse() { + Call call = null; + boolean expectedCall = false; + try { + // See HBaseServer.Call.setResponse for where we write out the response. + // Total size of the response. Unused. But have to read it in anyways. + int totalSize = in.readInt(); + + // Read the header + ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); + int id = responseHeader.getCallId(); + call = calls.remove(id); // call.done have to be set before leaving this method + expectedCall = (call != null && !call.isDone()); + if (!expectedCall) { + // So we got a response for which we have no corresponding 'call' here on the client-side. + // We probably timed out waiting, cleaned up all references, and now the server decides + // to return a response. There is nothing we can do w/ the response at this stage. Clean + // out the wire of the response so its out of the way and we can get other responses on + // this connection. + int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader); + int whatIsLeftToRead = totalSize - readSoFar; + IOUtils.skipFully(in, whatIsLeftToRead); + if (call != null) { + call.callStats.setResponseSizeBytes(totalSize); + call.callStats + .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); + } + return; + } + if (responseHeader.hasException()) { + ExceptionResponse exceptionResponse = responseHeader.getException(); + RemoteException re = createRemoteException(exceptionResponse); + call.setException(re); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats + .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); + if (isFatalConnectionException(exceptionResponse)) { + synchronized (this) { + closeConn(re); + } + } + } else { + Message value = null; + if (call.responseDefaultType != null) { + Builder builder = call.responseDefaultType.newBuilderForType(); + ProtobufUtil.mergeDelimitedFrom(builder, in); + value = builder.build(); + } + CellScanner cellBlockScanner = null; + if (responseHeader.hasCellBlockMeta()) { + int size = responseHeader.getCellBlockMeta().getLength(); + byte[] cellBlock = new byte[size]; + IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); + cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec, + this.compressor, cellBlock); + } + call.setResponse(value, cellBlockScanner); + call.callStats.setResponseSizeBytes(totalSize); + call.callStats + .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); + } + } catch (IOException e) { + if (expectedCall) { + call.setException(e); + } + if (e instanceof SocketTimeoutException) { + // Clean up open calls but don't treat this as a fatal condition, + // since we expect certain responses to not make it by the specified + // {@link ConnectionId#rpcTimeout}. + if (LOG.isTraceEnabled()) { + LOG.trace("ignored", e); + } + } else { + synchronized (this) { + closeConn(e); + } + } + } + } + + @Override + protected synchronized void callTimeout(Call call) { + // call sender + calls.remove(call.id); + } + + // just close socket input and output. + private void closeSocket() { + IOUtils.closeStream(out); + IOUtils.closeStream(in); + IOUtils.closeSocket(socket); + out = null; + in = null; + socket = null; + } + + // close socket, reader, and clean up all pending calls. + private void closeConn(IOException e) { + if (thread == null) { + return; + } + thread.interrupt(); + thread = null; + closeSocket(); + if (callSender != null) { + callSender.cleanup(e); + } + for (Call call : calls.values()) { + call.setException(e); + } + calls.clear(); + } + + // release all resources, the connection will not be used any more. + @Override + public synchronized void shutdown() { + closed = true; + if (callSender != null) { + callSender.interrupt(); + } + closeConn(new IOException("connection to " + remoteId.address + " closed")); + } + + @Override + public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) + throws IOException { + pcrc.notifyOnCancel(new RpcCallback<Object>() { + + @Override + public void run(Object parameter) { + setCancelled(call); + synchronized (BlockingRpcConnection.this) { + if (callSender != null) { + callSender.remove(call); + } else { + calls.remove(call.id); + } + } + } + }, new CancellationCallback() { + + @Override + public void run(boolean cancelled) throws IOException { + if (cancelled) { + setCancelled(call); + return; + } + scheduleTimeoutTask(call); + if (callSender != null) { + callSender.sendCall(call); + } else { + tracedWriteRequest(call); + } + } + }); + } + + @Override + public synchronized boolean isActive() { + return thread != null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java new file mode 100644 index 0000000..573ddd5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * We will expose the connection to upper layer before initialized, so we need to buffer the calls + * passed in and write them out once the connection is established. + */ +@InterfaceAudience.Private +class BufferCallBeforeInitHandler extends ChannelDuplexHandler { + + private enum BufferCallAction { + FLUSH, FAIL + } + + public static final class BufferCallEvent { + + public final BufferCallAction action; + + public final IOException error; + + private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action, + IOException error) { + this.action = action; + this.error = error; + } + + public static BufferCallBeforeInitHandler.BufferCallEvent success() { + return SUCCESS_EVENT; + } + + public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) { + return new BufferCallEvent(BufferCallAction.FAIL, error); + } + } + + private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH, + null); + + private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>(); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg instanceof Call) { + Call call = (Call) msg; + id2Call.put(call.id, call); + } else { + ctx.write(msg, promise); + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof BufferCallEvent) { + BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt; + switch (bcEvt.action) { + case FLUSH: + for (Call call : id2Call.values()) { + ctx.write(call); + } + break; + case FAIL: + for (Call call : id2Call.values()) { + call.setException(bcEvt.error); + } + break; + } + ctx.flush(); + ctx.pipeline().remove(this); + } else if (evt instanceof CallEvent) { + // just remove the call for now until we add other call event other than timeout and cancel. + id2Call.remove(((CallEvent) evt).call.id); + } else { + ctx.fireUserEventTriggered(evt); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 73bc0e2..a6203d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.ipc; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; + +import io.netty.util.Timeout; import java.io.IOException; @@ -27,29 +30,39 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; /** A call waiting for a value. */ @InterfaceAudience.Private -public class Call { - final int id; // call id - final Message param; // rpc request method param object +class Call { + final int id; // call id + final Message param; // rpc request method param object /** - * Optionally has cells when making call. Optionally has cells set on response. Used - * passing cells to the rpc and receiving the response. + * Optionally has cells when making call. Optionally has cells set on response. Used passing cells + * to the rpc and receiving the response. */ CellScanner cells; - Message response; // value, null if error - // The return type. Used to create shell into which we deserialize the response if any. + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "Direct access is only allowed after done") + Message response; // value, null if error + // The return type. Used to create shell into which we deserialize the response if any. Message responseDefaultType; - IOException error; // exception, null if value - volatile boolean done; // true when call is done + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "Direct access is only allowed after done") + IOException error; // exception, null if value + private boolean done; // true when call is done final Descriptors.MethodDescriptor md; final int timeout; // timeout in millisecond for this call; 0 means infinite. + final int priority; final MetricsConnection.CallStats callStats; + final RpcCallback<Call> callback; + final Span span; + Timeout timeoutTask; protected Call(int id, final Descriptors.MethodDescriptor md, Message param, - final CellScanner cells, final Message responseDefaultType, int timeout, - MetricsConnection.CallStats callStats) { + final CellScanner cells, final Message responseDefaultType, int timeout, int priority, + RpcCallback<Call> callback, MetricsConnection.CallStats callStats) { this.param = param; this.md = md; this.cells = cells; @@ -58,73 +71,74 @@ public class Call { this.responseDefaultType = responseDefaultType; this.id = id; this.timeout = timeout; + this.priority = priority; + this.callback = callback; + this.span = Trace.currentSpan(); + } + + @Override + public String toString() { + return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" + + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}"; } /** - * Check if the call did timeout. Set an exception (includes a notify) if it's the case. - * @return true if the call is on timeout, false otherwise. + * called from timeoutTask, prevent self cancel */ - public boolean checkAndSetTimeout() { - if (timeout == 0){ - return false; - } - - long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime(); - if (waitTime >= timeout) { - IOException ie = new CallTimeoutException("Call id=" + id + - ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired."); - setException(ie); // includes a notify - return true; - } else { - return false; + public void setTimeout(IOException error) { + synchronized (this) { + if (done) { + return; + } + this.done = true; + this.error = error; } + callback.run(this); } - public int remainingTime() { - if (timeout == 0) { - return Integer.MAX_VALUE; + private void callComplete() { + if (timeoutTask != null) { + timeoutTask.cancel(); } - - int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime()); - return remaining > 0 ? remaining : 0; - } - - @Override - public String toString() { - return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" + - (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}"; + callback.run(this); } - /** Indicate when the call is complete and the - * value or error are available. Notifies by default. */ - protected synchronized void callComplete() { - this.done = true; - notify(); // notify caller - } - - /** Set the exception when there is an error. - * Notify the caller the call is done. - * + /** + * Set the exception when there is an error. Notify the caller the call is done. * @param error exception thrown by the call; either local or remote */ public void setException(IOException error) { - this.error = error; + synchronized (this) { + if (done) { + return; + } + this.done = true; + this.error = error; + } callComplete(); } /** - * Set the return value when there is no error. - * Notify the caller the call is done. - * + * Set the return value when there is no error. Notify the caller the call is done. * @param response return value of the call. * @param cells Can be null */ public void setResponse(Message response, final CellScanner cells) { - this.response = response; - this.cells = cells; + synchronized (this) { + if (done) { + return; + } + this.done = true; + this.response = response; + this.cells = cells; + } callComplete(); } + public synchronized boolean isDone() { + return done; + } + public long getStartTime() { return this.callStats.getStartTime(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java new file mode 100644 index 0000000..a6777c0 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Client side call cancelled. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CallCancelledException extends HBaseIOException { + + private static final long serialVersionUID = 309775809470318208L; + + public CallCancelledException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java new file mode 100644 index 0000000..1c2ea32 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Used to tell netty handler the call is cancelled, timeout... + */ +@InterfaceAudience.Private +class CallEvent { + + public enum Type { + TIMEOUT, CANCELLED + } + + final Type type; + + final Call call; + + CallEvent(Type type, Call call) { + this.type = type; + this.call = call; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java index 1e31f72..db8c34a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java @@ -17,8 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.IOException; - +import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -28,7 +27,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @SuppressWarnings("serial") @InterfaceAudience.Public @InterfaceStability.Evolving -public class CallTimeoutException extends IOException { +public class CallTimeoutException extends HBaseIOException { + public CallTimeoutException(final String msg) { super(msg); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java index 072a490..fb2cafa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.ipc; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; + import java.io.IOException; import java.io.OutputStream; import java.nio.BufferOverflowException; @@ -46,12 +50,13 @@ import org.apache.hadoop.io.compress.Decompressor; * Helper class for building cell block. */ @InterfaceAudience.Private -public class CellBlockBuilder { +class CellBlockBuilder { // LOG is being used in TestCellBlockBuilder static final Log LOG = LogFactory.getLog(CellBlockBuilder.class); private final Configuration conf; + /** * How much we think the decompressor will expand the original compressed content. */ @@ -59,7 +64,7 @@ public class CellBlockBuilder { private final int cellBlockBuildingInitialBufferSize; - public CellBlockBuilder(final Configuration conf) { + public CellBlockBuilder(Configuration conf) { this.conf = conf; this.cellBlockDecompressionMultiplier = conf .getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); @@ -70,44 +75,104 @@ public class CellBlockBuilder { .align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024)); } + private interface OutputStreamSupplier { + + OutputStream get(int expectedSize); + + int size(); + } + + private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier { + + private ByteBufferOutputStream baos; + + @Override + public OutputStream get(int expectedSize) { + baos = new ByteBufferOutputStream(expectedSize); + return baos; + } + + @Override + public int size() { + return baos.size(); + } + } + /** * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or * <code>compressor</code>. - * @param codec to use for encoding - * @param compressor to use for encoding - * @param cellScanner to encode + * @param codec + * @param compressor + * @param cellScanner * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has * been flipped and is ready for reading. Use limit to find total size. - * @throws IOException if encoding the cells fail + * @throws IOException */ public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, final CellScanner cellScanner) throws IOException { - if (cellScanner == null) { + ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier(); + if (buildCellBlock(codec, compressor, cellScanner, supplier)) { + ByteBuffer bb = supplier.baos.getByteBuffer(); + // If no cells, don't mess around. Just return null (could be a bunch of existence checking + // gets or something -- stuff that does not return a cell). + return bb.hasRemaining() ? bb : null; + } else { return null; } + } + + private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier { + + private final ByteBufAllocator alloc; + + private ByteBuf buf; + + public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) { + this.alloc = alloc; + } + + @Override + public OutputStream get(int expectedSize) { + buf = alloc.buffer(expectedSize); + return new ByteBufOutputStream(buf); + } + + @Override + public int size() { + return buf.writerIndex(); + } + } + + public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner, + ByteBufAllocator alloc) throws IOException { + ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc); + if (buildCellBlock(codec, compressor, cellScanner, supplier)) { + return supplier.buf; + } else { + return null; + } + } + + private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor, + final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException { + if (cellScanner == null) { + return false; + } if (codec == null) { throw new CellScannerButNoCodecException(); } - int bufferSize = this.cellBlockBuildingInitialBufferSize; - ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); - encodeCellsTo(baos, cellScanner, codec, compressor); - if (LOG.isTraceEnabled()) { - if (bufferSize < baos.size()) { - LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() - + "; up hbase.ipc.cellblock.building.initial.buffersize?"); - } + int bufferSize = cellBlockBuildingInitialBufferSize; + encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor); + if (LOG.isTraceEnabled() && bufferSize < supplier.size()) { + LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size() + + "; up hbase.ipc.cellblock.building.initial.buffersize?"); } - ByteBuffer bb = baos.getByteBuffer(); - // If no cells, don't mess around. Just return null (could be a bunch of existence checking - // gets or something -- stuff that does not return a cell). - if (!bb.hasRemaining()) return null; - return bb; + return true; } - private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec, + private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec, CompressionCodec compressor) throws IOException { - OutputStream os = bbos; Compressor poolCompressor = null; try { if (compressor != null) { @@ -122,7 +187,7 @@ public class CellBlockBuilder { encoder.write(cellScanner.current()); } encoder.flush(); - } catch (BufferOverflowException e) { + } catch (BufferOverflowException | IndexOutOfBoundsException e) { throw new DoNotRetryIOException(e); } finally { os.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java index 08f8171..1b837d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.security.User; * to servers are uniquely identified by <remoteAddress, ticket, serviceName> */ @InterfaceAudience.Private -public class ConnectionId { +class ConnectionId { private static final int PRIME = 16777619; final User ticket; final String serviceName; http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java new file mode 100644 index 0000000..c7c0f32 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Pair; + +/** + * The default netty event loop config + */ +@InterfaceAudience.Private +class DefaultNettyEventLoopConfig { + + public static final Pair<EventLoopGroup, Class<? extends Channel>> GROUP_AND_CHANNEL_CLASS = Pair + .<EventLoopGroup, Class<? extends Channel>> newPair(new NioEventLoopGroup(), + NioSocketChannel.class); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java new file mode 100644 index 0000000..721148b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Indicate that the rpc server tells client to fallback to simple auth but client is disabled to do + * so. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class FallbackDisallowedException extends HBaseIOException { + + private static final long serialVersionUID = -6942845066279358253L; + + public FallbackDisallowedException() { + super("Server asks us to fall back to SIMPLE auth, " + + "but this client is configured to only allow secure connections."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java deleted file mode 100644 index 09dda09..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.io.IOException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Converts exceptions to other exceptions - */ -@InterfaceAudience.Private -public interface IOExceptionConverter { - /** - * Converts given IOException - * @param e exception to convert - * @return converted IOException - */ - IOException convert(IOException e); -}