http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
deleted file mode 100644
index 5f4d2f4..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ /dev/null
@@ -1,510 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.JVM;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.PoolMap;
-import org.apache.hadoop.hbase.util.Threads;
-
-/**
- * Netty client for the requests and responses
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class AsyncRpcClient extends AbstractRpcClient {
-
-  private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
-
-  public static final String CLIENT_MAX_THREADS = 
"hbase.rpc.client.threads.max";
-  public static final String USE_NATIVE_TRANSPORT = 
"hbase.rpc.client.nativetransport";
-  public static final String USE_GLOBAL_EVENT_LOOP_GROUP = 
"hbase.rpc.client.globaleventloopgroup";
-
-  private static final HashedWheelTimer WHEEL_TIMER =
-      new 
HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
-          100, TimeUnit.MILLISECONDS);
-
-  private static final ChannelInitializer<SocketChannel> 
DEFAULT_CHANNEL_INITIALIZER =
-      new ChannelInitializer<SocketChannel>() {
-    @Override
-    protected void initChannel(SocketChannel ch) throws Exception {
-      //empty initializer
-    }
-  };
-
-  protected final AtomicInteger callIdCnt = new AtomicInteger();
-
-  private final PoolMap<Integer, AsyncRpcChannel> connections;
-
-  final FailedServers failedServers;
-
-  @VisibleForTesting
-  final Bootstrap bootstrap;
-
-  private final boolean useGlobalEventLoopGroup;
-
-  @VisibleForTesting
-  static Pair<EventLoopGroup, Class<? extends Channel>> 
GLOBAL_EVENT_LOOP_GROUP;
-
-  synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
-      getGlobalEventLoopGroup(Configuration conf) {
-    if (GLOBAL_EVENT_LOOP_GROUP == null) {
-      GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Create global event loop group "
-            + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
-      }
-    }
-    return GLOBAL_EVENT_LOOP_GROUP;
-  }
-
-  private static Pair<EventLoopGroup, Class<? extends Channel>> 
createEventLoopGroup(
-      Configuration conf) {
-    // Max amount of threads to use. 0 lets Netty decide based on amount of 
cores
-    int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
-
-    // Config to enable native transport. Does not seem to be stable at time 
of implementation
-    // although it is not extensively tested.
-    boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
-
-    // Use the faster native epoll transport mechanism on linux if enabled
-    if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Create EpollEventLoopGroup with maxThreads = " + 
maxThreads);
-      }
-      return new Pair<EventLoopGroup, Class<? extends Channel>>(new 
EpollEventLoopGroup(maxThreads,
-          Threads.newDaemonThreadFactory("AsyncRpcChannel")), 
EpollSocketChannel.class);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
-      }
-      return new Pair<EventLoopGroup, Class<? extends Channel>>(new 
NioEventLoopGroup(maxThreads,
-          Threads.newDaemonThreadFactory("AsyncRpcChannel")), 
NioSocketChannel.class);
-    }
-  }
-
-  /**
-   * Constructor for tests
-   *
-   * @param configuration      to HBase
-   * @param clusterId          for the cluster
-   * @param localAddress       local address to connect to
-   * @param metrics            the connection metrics
-   * @param channelInitializer for custom channel handlers
-   */
-  protected AsyncRpcClient(Configuration configuration, String clusterId,
-      SocketAddress localAddress, MetricsConnection metrics,
-      ChannelInitializer<SocketChannel> channelInitializer) {
-    super(configuration, clusterId, localAddress, metrics);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Starting async Hbase RPC client");
-    }
-
-    Pair<EventLoopGroup, Class<? extends Channel>> 
eventLoopGroupAndChannelClass;
-    this.useGlobalEventLoopGroup = 
conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
-    if (useGlobalEventLoopGroup) {
-      eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
-    } else {
-      eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + 
" event loop group "
-          + 
eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
-    }
-
-    this.connections = new PoolMap<>(getPoolType(configuration), 
getPoolSize(configuration));
-    this.failedServers = new FailedServers(configuration);
-
-    int operationTimeout = 
configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-
-    // Configure the default bootstrap.
-    this.bootstrap = new Bootstrap();
-    bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
-        .channel(eventLoopGroupAndChannelClass.getSecond())
-        .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
-        .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout);
-    if (channelInitializer == null) {
-      channelInitializer = DEFAULT_CHANNEL_INITIALIZER;
-    }
-    bootstrap.handler(channelInitializer);
-    if (localAddress != null) {
-      bootstrap.localAddress(localAddress);
-    }
-  }
-
-  /** Used in test only. */
-  AsyncRpcClient(Configuration configuration) {
-    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
-  }
-
-  /** Used in test only. */
-  AsyncRpcClient(Configuration configuration,
-      ChannelInitializer<SocketChannel> channelInitializer) {
-    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, 
channelInitializer);
-  }
-
-  /**
-   * Constructor
-   *
-   * @param configuration to HBase
-   * @param clusterId     for the cluster
-   * @param localAddress  local address to connect to
-   * @param metrics       the connection metrics
-   */
-  public AsyncRpcClient(Configuration configuration, String clusterId, 
SocketAddress localAddress,
-      MetricsConnection metrics) {
-    this(configuration, clusterId, localAddress, metrics, null);
-  }
-
-  /**
-   * Make a call, passing <code>param</code>, to the IPC server running at
-   * <code>address</code> which is servicing the <code>protocol</code> 
protocol,
-   * with the <code>ticket</code> credentials, returning the value.
-   * Throws exceptions if there are network problems or if the remote code
-   * threw an exception.
-   *
-   * @param ticket Be careful which ticket you pass. A new user will mean a 
new Connection.
-   *               {@link 
org.apache.hadoop.hbase.security.UserProvider#getCurrent()} makes a new
-   *               instance of User each time so will be a new Connection each 
time.
-   * @return A pair with the Message response and the Cell data (if any).
-   * @throws InterruptedException if call is interrupted
-   * @throws java.io.IOException  if a connection failure is encountered
-   */
-  @Override
-  protected Pair<Message, CellScanner> call(HBaseRpcController pcrc,
-      Descriptors.MethodDescriptor md, Message param, Message returnType, User 
ticket,
-      InetSocketAddress addr, MetricsConnection.CallStats callStats)
-      throws IOException, InterruptedException {
-    if (pcrc == null) {
-      pcrc = new HBaseRpcControllerImpl();
-    }
-    final AsyncRpcChannel connection = 
createRpcChannel(md.getService().getName(), addr, ticket);
-
-    final Promise<Message> promise = connection.callMethod(md, param, 
pcrc.cellScanner(), returnType,
-        getMessageConverterWithRpcController(pcrc), null, 
pcrc.getCallTimeout(),
-        pcrc.getPriority());
-
-    pcrc.notifyOnCancel(new RpcCallback<Object>() {
-      @Override
-      public void run(Object parameter) {
-        // Will automatically fail the promise with CancellationException
-        promise.cancel(true);
-      }
-    });
-
-    long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
-    try {
-      Message response = timeout > 0 ? promise.get(timeout, 
TimeUnit.MILLISECONDS) : promise.get();
-      return new Pair<>(response, pcrc.cellScanner());
-    } catch (ExecutionException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      } else {
-        throw IPCUtil.wrapException(addr, (Exception) e.getCause());
-      }
-    } catch (TimeoutException e) {
-      CallTimeoutException cte = new CallTimeoutException(promise.toString());
-      throw IPCUtil.wrapException(addr, cte);
-    }
-  }
-
-  private MessageConverter<Message, Message> 
getMessageConverterWithRpcController(
-      final HBaseRpcController pcrc) {
-    return new
-      MessageConverter<Message, Message>() {
-        @Override
-        public Message convert(Message msg, CellScanner cellScanner) {
-          pcrc.setCellScanner(cellScanner);
-          return msg;
-        }
-      };
-  }
-
-  /**
-   * Call method async
-   */
-  private void callMethod(final Descriptors.MethodDescriptor md,
-      final HBaseRpcController pcrc, final Message param, Message returnType, 
User ticket,
-      InetSocketAddress addr, final RpcCallback<Message> done) {
-    final AsyncRpcChannel connection;
-    try {
-      connection = createRpcChannel(md.getService().getName(), addr, ticket);
-
-      FutureListener<Message> listener =
-        new FutureListener<Message>() {
-          @Override
-          public void operationComplete(Future<Message> future) throws 
Exception {
-            if (!future.isSuccess()) {
-              Throwable cause = future.cause();
-              if (cause instanceof IOException) {
-                pcrc.setFailed((IOException) cause);
-              } else {
-                pcrc.setFailed(new IOException(cause));
-              }
-            } else {
-              try {
-                done.run(future.get());
-              } catch (ExecutionException e) {
-                Throwable cause = e.getCause();
-                if (cause instanceof IOException) {
-                  pcrc.setFailed((IOException) cause);
-                } else {
-                  pcrc.setFailed(new IOException(cause));
-                }
-              } catch (InterruptedException e) {
-                pcrc.setFailed(new IOException(e));
-              }
-            }
-          }
-        };
-      connection.callMethod(md, param, pcrc.cellScanner(), returnType,
-          getMessageConverterWithRpcController(pcrc), null,
-          pcrc.getCallTimeout(), pcrc.getPriority())
-          .addListener(listener);
-    } catch (StoppedRpcClientException|FailedServerException e) {
-      pcrc.setFailed(e);
-    }
-  }
-
-  private boolean closed = false;
-
-  /**
-   * Close netty
-   */
-  public void close() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Stopping async HBase RPC client");
-    }
-
-    synchronized (connections) {
-      if (closed) {
-        return;
-      }
-      closed = true;
-      for (AsyncRpcChannel conn : connections.values()) {
-        conn.close(null);
-      }
-    }
-    // do not close global EventLoopGroup.
-    if (!useGlobalEventLoopGroup) {
-      bootstrap.config().group().shutdownGracefully();
-    }
-  }
-
-  /**
-   * Create a cell scanner
-   *
-   * @param cellBlock to create scanner for
-   * @return CellScanner
-   * @throws java.io.IOException on error on creation cell scanner
-   */
-  public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
-    return cellBlockBuilder.createCellScanner(this.codec, this.compressor, 
cellBlock);
-  }
-
-  /**
-   * Build cell block
-   *
-   * @param cells to create block with
-   * @return ByteBuffer with cells
-   * @throws java.io.IOException if block creation fails
-   */
-  public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
-    return cellBlockBuilder.buildCellBlock(this.codec, this.compressor, cells);
-  }
-
-  /**
-   * Creates an RPC client
-   *
-   * @param serviceName    name of service
-   * @param location       to connect to
-   * @param ticket         for current user
-   * @return new RpcChannel
-   * @throws StoppedRpcClientException when Rpc client is stopped
-   * @throws FailedServerException if server failed
-   */
-  private AsyncRpcChannel createRpcChannel(String serviceName, 
InetSocketAddress location,
-      User ticket) throws StoppedRpcClientException, FailedServerException {
-    // Check if server is failed
-    if (this.failedServers.isFailedServer(location)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not trying to connect to " + location +
-            " this server is in the failed servers list");
-      }
-      throw new FailedServerException(
-          "This server is in the failed servers list: " + location);
-    }
-
-    int hashCode = ConnectionId.hashCode(ticket,serviceName,location);
-
-    AsyncRpcChannel rpcChannel;
-    synchronized (connections) {
-      if (closed) {
-        throw new StoppedRpcClientException();
-      }
-      rpcChannel = connections.get(hashCode);
-      if (rpcChannel != null && !rpcChannel.isAlive()) {
-        LOG.debug("Removing dead channel from 
server="+rpcChannel.getAddress().toString());
-        connections.remove(hashCode);
-      }
-      if (rpcChannel == null) {
-        rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, 
serviceName, location);
-        connections.put(hashCode, rpcChannel);
-      }
-    }
-
-    return rpcChannel;
-  }
-
-  /**
-   * Interrupt the connections to the given ip:port server. This should be 
called if the server
-   * is known as actually dead. This will not prevent current operation to be 
retried, and,
-   * depending on their own behavior, they may retry on the same server. This 
can be a feature,
-   * for example at startup. In any case, they're likely to get connection 
refused (if the
-   * process died) or no route to host: i.e. there next retries should be 
faster and with a
-   * safe exception.
-   *
-   * @param sn server to cancel connections for
-   */
-  @Override
-  public void cancelConnections(ServerName sn) {
-    synchronized (connections) {
-      for (AsyncRpcChannel rpcChannel : connections.values()) {
-        if (rpcChannel.isAlive() &&
-            rpcChannel.getAddress().getPort() == sn.getPort() &&
-            
rpcChannel.getAddress().getHostName().contentEquals(sn.getHostname())) {
-          LOG.info("The server on " + sn.toString() +
-              " is dead - stopping the connection " + rpcChannel.toString());
-          rpcChannel.close(null);
-        }
-      }
-    }
-  }
-
-  /**
-   * Remove connection from pool
-   * @param connection to remove
-   */
-  public void removeConnection(AsyncRpcChannel connection) {
-    int connectionHashCode = connection.hashCode();
-    synchronized (connections) {
-      // we use address as cache key, so we should check here to prevent 
removing the
-      // wrong connection
-      AsyncRpcChannel connectionInPool = 
this.connections.get(connectionHashCode);
-      if (connectionInPool != null && connectionInPool.equals(connection)) {
-        this.connections.remove(connectionHashCode);
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("%s already removed, expected instance %08x, 
actual %08x",
-          connection.toString(), System.identityHashCode(connection),
-          System.identityHashCode(connectionInPool)));
-      }
-    }
-  }
-
-  @Override
-  public RpcChannel createProtobufRpcChannel(final ServerName sn, final User 
user, int rpcTimeout) {
-    return new RpcChannelImplementation(this, sn, user, rpcTimeout);
-  }
-
-  /**
-   * Blocking rpc channel that goes via hbase rpc.
-   */
-  @VisibleForTesting
-  public static class RpcChannelImplementation implements RpcChannel {
-    private final InetSocketAddress isa;
-    private final AsyncRpcClient rpcClient;
-    private final User ticket;
-    private final int channelOperationTimeout;
-
-    /**
-     * @param channelOperationTimeout - the default timeout when no timeout is 
given
-     */
-    protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
-        final ServerName sn, final User ticket, int channelOperationTimeout) {
-      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
-      this.rpcClient = rpcClient;
-      this.ticket = ticket;
-      this.channelOperationTimeout = channelOperationTimeout;
-    }
-
-    @Override
-    public void callMethod(Descriptors.MethodDescriptor md, RpcController 
controller,
-        Message param, Message returnType, RpcCallback<Message> done) {
-      HBaseRpcController pcrc =
-          configurePayloadCarryingRpcController(controller, 
channelOperationTimeout);
-
-      this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, 
this.isa, done);
-    }
-  }
-
-  /**
-   * Get a new timeout on this RPC client
-   * @param task to run at timeout
-   * @param delay for the timeout
-   * @param unit time unit for the timeout
-   * @return Timeout
-   */
-  Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
-    return WHEEL_TIMER.newTimeout(task, delay, unit);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
deleted file mode 100644
index 7a2802f..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Message;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.ipc.RemoteException;
-
-/**
- * Handles Hbase responses
- */
-@InterfaceAudience.Private
-public class AsyncServerResponseHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
-  private final AsyncRpcChannel channel;
-
-  /**
-   * Constructor
-   * @param channel on which this response handler operates
-   */
-  public AsyncServerResponseHandler(AsyncRpcChannel channel) {
-    this.channel = channel;
-  }
-
-  @Override
-  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf inBuffer) 
throws Exception {
-    ByteBufInputStream in = new ByteBufInputStream(inBuffer);
-    int totalSize = inBuffer.readableBytes();
-    // Read the header
-    RPCProtos.ResponseHeader responseHeader = 
RPCProtos.ResponseHeader.parseDelimitedFrom(in);
-    int id = responseHeader.getCallId();
-    AsyncCall call = channel.removePendingCall(id);
-    if (call == null) {
-      // So we got a response for which we have no corresponding 'call' here 
on the client-side.
-      // We probably timed out waiting, cleaned up all references, and now the 
server decides
-      // to return a response. There is nothing we can do w/ the response at 
this stage. Clean
-      // out the wire of the response so its out of the way and we can get 
other responses on
-      // this connection.
-      int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
-      int whatIsLeftToRead = totalSize - readSoFar;
-
-      // This is done through a Netty ByteBuf which has different behavior 
than InputStream.
-      // It does not return number of bytes read but will update pointer 
internally and throws an
-      // exception when too many bytes are to be skipped.
-      inBuffer.skipBytes(whatIsLeftToRead);
-      return;
-    }
-
-    if (responseHeader.hasException()) {
-      RPCProtos.ExceptionResponse exceptionResponse = 
responseHeader.getException();
-      RemoteException re = createRemoteException(exceptionResponse);
-      if (exceptionResponse.getExceptionClassName()
-          .equals(FatalConnectionException.class.getName())) {
-        channel.close(re);
-      } else {
-        call.setFailed(re);
-      }
-    } else {
-      Message value = null;
-      // Call may be null because it may have timedout and been cleaned up on 
this side already
-      if (call.responseDefaultType != null) {
-        Message.Builder builder = call.responseDefaultType.newBuilderForType();
-        ProtobufUtil.mergeDelimitedFrom(builder, in);
-        value = builder.build();
-      }
-      CellScanner cellBlockScanner = null;
-      if (responseHeader.hasCellBlockMeta()) {
-        int size = responseHeader.getCellBlockMeta().getLength();
-        byte[] cellBlock = new byte[size];
-        inBuffer.readBytes(cellBlock, 0, cellBlock.length);
-        cellBlockScanner = channel.client.createCellScanner(cellBlock);
-      }
-      call.setSuccess(value, cellBlockScanner);
-      call.callStats.setResponseSizeBytes(totalSize);
-    }
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-    channel.close(cause);
-  }
-
-  @Override
-  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-    channel.close(new IOException("connection closed"));
-  }
-
-  /**
-   * @param e Proto exception
-   * @return RemoteException made from passed <code>e</code>
-   */
-  private RemoteException createRemoteException(final 
RPCProtos.ExceptionResponse e) {
-    String innerExceptionClassName = e.getExceptionClassName();
-    boolean doNotRetry = e.getDoNotRetry();
-    return e.hasHostname() ?
-        // If a hostname then add it to the RemoteWithExtrasException
-        new RemoteWithExtrasException(innerExceptionClassName, 
e.getStackTrace(), e.getHostname(),
-            e.getPort(), doNotRetry)
-        : new RemoteWithExtrasException(innerExceptionClassName, 
e.getStackTrace(), doNotRetry);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
index 0475e58..523ca55 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcCallback.java
@@ -46,7 +46,7 @@ public class BlockingRpcCallback<R> implements RpcCallback<R> 
{
     synchronized (this) {
       result = parameter;
       resultSet = true;
-      this.notify();
+      this.notifyAll();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
new file mode 100644
index 0000000..d27602e
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.net.NetUtils;
+
+/**
+ * Does RPC against a cluster. Manages connections per regionserver in the 
cluster.
+ * <p>
+ * See HBaseServer
+ */
+@InterfaceAudience.Private
+public class BlockingRpcClient extends 
AbstractRpcClient<BlockingRpcConnection> {
+
+  protected final SocketFactory socketFactory; // how to create sockets
+
+  /**
+   * Used in test only. Construct an IPC client for the cluster {@code 
clusterId} with the default
+   * SocketFactory
+   */
+  @VisibleForTesting
+  BlockingRpcClient(Configuration conf) {
+    this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null);
+  }
+
+  /**
+   * Construct an IPC client for the cluster {@code clusterId} with the 
default SocketFactory This
+   * method is called with reflection by the RpcClientFactory to create an 
instance
+   * @param conf configuration
+   * @param clusterId the cluster id
+   * @param localAddr client socket bind address.
+   * @param metrics the connection metrics
+   */
+  public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress 
localAddr,
+      MetricsConnection metrics) {
+    super(conf, clusterId, localAddr, metrics);
+    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+  }
+
+  /**
+   * Creates a connection. Can be overridden by a subclass for testing.
+   * @param remoteId - the ConnectionId to use for the connection creation.
+   */
+  protected BlockingRpcConnection createConnection(ConnectionId remoteId) 
throws IOException {
+    return new BlockingRpcConnection(this, remoteId);
+  }
+
+  @Override
+  protected void closeInternal() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
new file mode 100644
index 0000000..c8b366d
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -0,0 +1,725 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException;
+import static 
org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.RpcCallback;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
+import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+/**
+ * Thread that reads responses and notifies callers. Each connection owns a 
socket connected to a
+ * remote address. Calls are multiplexed through this socket: responses may be 
delivered out of
+ * order.
+ */
+@InterfaceAudience.Private
+class BlockingRpcConnection extends RpcConnection implements Runnable {
+
+  private static final Log LOG = 
LogFactory.getLog(BlockingRpcConnection.class);
+
+  private final BlockingRpcClient rpcClient;
+
+  private final String threadName;
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"IS2_INCONSISTENT_SYNC",
+      justification = "We are always under lock actually")
+  private Thread thread;
+
+  // connected socket. protected for writing UT.
+  protected Socket socket = null;
+  private DataInputStream in;
+  private DataOutputStream out;
+
+  private HBaseSaslRpcClient saslRpcClient;
+
+  // currently active calls
+  private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>();
+
+  private final CallSender callSender;
+
+  private boolean closed = false;
+
+  private byte[] connectionHeaderPreamble;
+
+  private byte[] connectionHeaderWithLength;
+
+  /**
+   * If the client wants to interrupt its calls easily (i.e. call 
Thread#interrupt), it gets into a
+   * java issue: an interruption during a write closes the socket/channel. A 
way to avoid this is to
+   * use a different thread for writing. This way, on interruptions, we either 
cancel the writes or
+   * ignore the answer if the write is already done, but we don't stop the 
write in the middle. This
+   * adds a thread per region server in the client, so it's kept as an option.
+   * <p>
+   * The implementation is simple: the client threads adds their call to the 
queue, and then wait
+   * for an answer. The CallSender blocks on the queue, and writes the calls 
one after the other. On
+   * interruption, the client cancels its call. The CallSender checks that the 
call has not been
+   * canceled before writing it.
+   * </p>
+   * When the connection closes, all the calls not yet sent are dismissed. The 
client thread is
+   * notified with an appropriate exception, as if the call was already sent 
but the answer not yet
+   * received.
+   * </p>
+   */
+  private class CallSender extends Thread {
+
+    private final Queue<Call> callsToWrite;
+
+    private final int maxQueueSize;
+
+    public CallSender(String name, Configuration conf) {
+      int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
+      callsToWrite = new ArrayDeque<>(queueSize);
+      this.maxQueueSize = queueSize;
+      setDaemon(true);
+      setName(name + " - writer");
+    }
+
+    public void sendCall(final Call call) throws IOException {
+      if (callsToWrite.size() >= maxQueueSize) {
+        throw new IOException("Can't add the call " + call.id
+            + " to the write queue. callsToWrite.size()=" + 
callsToWrite.size());
+      }
+      callsToWrite.offer(call);
+      BlockingRpcConnection.this.notifyAll();
+    }
+
+    public void remove(Call call) {
+      callsToWrite.remove();
+      // By removing the call from the expected call list, we make the list 
smaller, but
+      // it means as well that we don't know how many calls we cancelled.
+      calls.remove(call.id);
+      call.setException(new CallCancelledException("Call id=" + call.id + ", 
waitTime="
+          + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", 
rpcTimeout="
+          + call.timeout));
+    }
+
+    /**
+     * Reads the call from the queue, write them on the socket.
+     */
+    @Override
+    public void run() {
+      synchronized (BlockingRpcConnection.this) {
+        while (!closed) {
+          if (callsToWrite.isEmpty()) {
+            // We should use another monitor object here for better 
performance since the read
+            // thread also uses ConnectionImpl.this. But this makes the 
locking schema more
+            // complicated, can do it later as an optimization.
+            try {
+              BlockingRpcConnection.this.wait();
+            } catch (InterruptedException e) {
+            }
+            // check if we need to quit, so continue the main loop instead of 
fallback.
+            continue;
+          }
+          Call call = callsToWrite.poll();
+          if (call.isDone()) {
+            continue;
+          }
+          try {
+            tracedWriteRequest(call);
+          } catch (IOException e) {
+            // exception here means the call has not been added to the 
pendingCalls yet, so we need
+            // to fail it by our own.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("call write error for call #" + call.id, e);
+            }
+            call.setException(e);
+            closeConn(e);
+          }
+        }
+      }
+    }
+
+    /**
+     * Cleans the call not yet sent when we finish.
+     */
+    public void cleanup(IOException e) {
+      IOException ie = new ConnectionClosingException(
+          "Connection to " + remoteId.address + " is closing.");
+      for (Call call : callsToWrite) {
+        call.setException(ie);
+      }
+      callsToWrite.clear();
+    }
+  }
+
+  BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) 
throws IOException {
+    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, 
rpcClient.clusterId,
+        rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, 
rpcClient.compressor);
+    this.rpcClient = rpcClient;
+    if (remoteId.getAddress().isUnresolved()) {
+      throw new UnknownHostException("unknown host: " + 
remoteId.getAddress().getHostName());
+    }
+
+    this.connectionHeaderPreamble = getConnectionHeaderPreamble();
+    ConnectionHeader header = getConnectionHeader();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + 
header.getSerializedSize());
+    DataOutputStream dos = new DataOutputStream(baos);
+    dos.writeInt(header.getSerializedSize());
+    header.writeTo(dos);
+    assert baos.size() == 4 + header.getSerializedSize();
+    this.connectionHeaderWithLength = baos.getBuffer();
+
+    UserGroupInformation ticket = remoteId.ticket.getUGI();
+    this.threadName = "IPC Client (" + this.rpcClient.socketFactory.hashCode() 
+ ") connection to "
+        + remoteId.getAddress().toString()
+        + ((ticket == null) ? " from an unknown user" : (" from " + 
ticket.getUserName()));
+
+    if 
(this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, 
false)) {
+      callSender = new CallSender(threadName, this.rpcClient.conf);
+      callSender.start();
+    } else {
+      callSender = null;
+    }
+  }
+
+  // protected for write UT.
+  protected void setupConnection() throws IOException {
+    short ioFailures = 0;
+    short timeoutFailures = 0;
+    while (true) {
+      try {
+        this.socket = this.rpcClient.socketFactory.createSocket();
+        this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
+        this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
+        if (this.rpcClient.localAddr != null) {
+          this.socket.bind(this.rpcClient.localAddr);
+        }
+        NetUtils.connect(this.socket, remoteId.getAddress(), 
this.rpcClient.connectTO);
+        this.socket.setSoTimeout(this.rpcClient.readTO);
+        return;
+      } catch (SocketTimeoutException toe) {
+        /*
+         * The max number of retries is 45, which amounts to 20s*45 = 15 
minutes retries.
+         */
+        handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, 
toe);
+      } catch (IOException ie) {
+        handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie);
+      }
+    }
+  }
+
+  /**
+   * Handle connection failures If the current number of retries is equal to 
the max number of
+   * retries, stop retrying and throw the exception; Otherwise backoff N 
seconds and try connecting
+   * again. This Method is only called from inside setupIOstreams(), which is 
synchronized. Hence
+   * the sleep is synchronized; the locks will be retained.
+   * @param curRetries current number of retries
+   * @param maxRetries max number of retries allowed
+   * @param ioe failure reason
+   * @throws IOException if max number of retries is reached
+   */
+  private void handleConnectionFailure(int curRetries, int maxRetries, 
IOException ioe)
+      throws IOException {
+    closeSocket();
+
+    // throw the exception if the maximum number of retries is reached
+    if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
+      throw ioe;
+    }
+
+    // otherwise back off and retry
+    try {
+      Thread.sleep(this.rpcClient.failureSleep);
+    } catch (InterruptedException ie) {
+      ExceptionUtil.rethrowIfInterrupt(ie);
+    }
+
+    LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after 
sleeping "
+        + this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " 
time(s).");
+  }
+
+  /*
+   * wait till someone signals us to start reading RPC response or it is idle 
too long, it is marked
+   * as to be closed, or the client is marked as not running.
+   * @return true if it is time to read a response; false otherwise.
+   */
+  private synchronized boolean waitForWork() {
+    // beware of the concurrent access to the calls list: we can add calls, 
but as well
+    // remove them.
+    long waitUntil = EnvironmentEdgeManager.currentTime() + 
this.rpcClient.minIdleTimeBeforeClose;
+    for (;;) {
+      if (thread == null) {
+        return false;
+      }
+      if (!calls.isEmpty()) {
+        return true;
+      }
+      if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
+        closeConn(
+          new IOException("idle connection closed with " + calls.size() + " 
pending request(s)"));
+        return false;
+      }
+      try {
+        wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(threadName + ": starting, connections " + 
this.rpcClient.connections.size());
+    }
+    while (waitForWork()) {
+      readResponse();
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(threadName + ": stopped, connections " + 
this.rpcClient.connections.size());
+    }
+  }
+
+  private void disposeSasl() {
+    if (saslRpcClient != null) {
+      saslRpcClient.dispose();
+      saslRpcClient = null;
+    }
+  }
+
+  private boolean setupSaslConnection(final InputStream in2, final 
OutputStream out2)
+      throws IOException {
+    saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal,
+        this.rpcClient.fallbackAllowed, 
this.rpcClient.conf.get("hbase.rpc.protection",
+          QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
+    return saslRpcClient.saslConnect(in2, out2);
+  }
+
+  /**
+   * If multiple clients with the same principal try to connect to the same 
server at the same time,
+   * the server assumes a replay attack is in progress. This is a feature of 
kerberos. In order to
+   * work around this, what is done is that the client backs off randomly and 
tries to initiate the
+   * connection again. The other problem is to do with ticket expiry. To 
handle that, a relogin is
+   * attempted.
+   * <p>
+   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} 
method. In case when the
+   * user doesn't have valid credentials, we don't need to retry (from cache 
or ticket). In such
+   * cases, it is prudent to throw a runtime exception when we receive a 
SaslException from the
+   * underlying authentication implementation, so there is no retry from other 
high level (for eg,
+   * HCM or HBaseAdmin).
+   * </p>
+   */
+  private void handleSaslConnectionFailure(final int currRetries, final int 
maxRetries,
+      final Exception ex, final UserGroupInformation user)
+      throws IOException, InterruptedException {
+    closeSocket();
+    user.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws IOException, InterruptedException {
+        if (shouldAuthenticateOverKrb()) {
+          if (currRetries < maxRetries) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Exception encountered while connecting to " + "the 
server : " + ex);
+            }
+            // try re-login
+            relogin();
+            disposeSasl();
+            // have granularity of milliseconds
+            // we are sleeping with the Connection lock held but since this
+            // connection instance is being used for connecting to the server
+            // in question, it is okay
+            
Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
+            return null;
+          } else {
+            String msg = "Couldn't setup connection for "
+                + UserGroupInformation.getLoginUser().getUserName() + " to " + 
serverPrincipal;
+            LOG.warn(msg, ex);
+            throw (IOException) new IOException(msg).initCause(ex);
+          }
+        } else {
+          LOG.warn("Exception encountered while connecting to " + "the server 
: " + ex);
+        }
+        if (ex instanceof RemoteException) {
+          throw (RemoteException) ex;
+        }
+        if (ex instanceof SaslException) {
+          String msg = "SASL authentication failed."
+              + " The most likely cause is missing or invalid credentials." + 
" Consider 'kinit'.";
+          LOG.fatal(msg, ex);
+          throw new RuntimeException(msg, ex);
+        }
+        throw new IOException(ex);
+      }
+    });
+  }
+
+  private void setupIOstreams() throws IOException {
+    if (socket != null) {
+      // The connection is already available. Perfect.
+      return;
+    }
+
+    if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not trying to connect to " + remoteId.address
+            + " this server is in the failed servers list");
+      }
+      throw new FailedServerException(
+          "This server is in the failed servers list: " + remoteId.address);
+    }
+
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Connecting to " + remoteId.address);
+      }
+
+      short numRetries = 0;
+      final short MAX_RETRIES = 5;
+      while (true) {
+        setupConnection();
+        InputStream inStream = NetUtils.getInputStream(socket);
+        // This creates a socket with a write timeout. This timeout cannot be 
changed.
+        OutputStream outStream = NetUtils.getOutputStream(socket, 
this.rpcClient.writeTO);
+        // Write out the preamble -- MAGIC, version, and auth to use.
+        writeConnectionHeaderPreamble(outStream);
+        if (useSasl) {
+          final InputStream in2 = inStream;
+          final OutputStream out2 = outStream;
+          UserGroupInformation ticket = getUGI();
+          boolean continueSasl;
+          if (ticket == null) {
+            throw new FatalConnectionException("ticket/user is null");
+          }
+          try {
+            continueSasl = ticket.doAs(new 
PrivilegedExceptionAction<Boolean>() {
+              @Override
+              public Boolean run() throws IOException {
+                return setupSaslConnection(in2, out2);
+              }
+            });
+          } catch (Exception ex) {
+            ExceptionUtil.rethrowIfInterrupt(ex);
+            handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, ticket);
+            continue;
+          }
+          if (continueSasl) {
+            // Sasl connect is successful. Let's set up Sasl i/o streams.
+            inStream = saslRpcClient.getInputStream(inStream);
+            outStream = saslRpcClient.getOutputStream(outStream);
+          } else {
+            // fall back to simple auth because server told us so.
+            // do not change authMethod and useSasl here, we should start from 
secure when
+            // reconnecting because regionserver may change its sasl config 
after restart.
+          }
+        }
+        this.in = new DataInputStream(new BufferedInputStream(inStream));
+        this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+        // Now write out the connection header
+        writeConnectionHeader();
+        break;
+      }
+    } catch (Throwable t) {
+      closeSocket();
+      IOException e = ExceptionUtil.asInterrupt(t);
+      if (e == null) {
+        this.rpcClient.failedServers.addToFailedServers(remoteId.address);
+        if (t instanceof LinkageError) {
+          // probably the hbase hadoop version does not match the running 
hadoop version
+          e = new DoNotRetryIOException(t);
+        } else if (t instanceof IOException) {
+          e = (IOException) t;
+        } else {
+          e = new IOException("Could not set up IO Streams to " + 
remoteId.address, t);
+        }
+      }
+      throw e;
+    }
+
+    // start the receiver thread after the socket connection has been set up
+    thread = new Thread(this, threadName);
+    thread.setDaemon(true);
+    thread.start();
+  }
+
+  /**
+   * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> 
<ONEBYTE_AUTH_TYPE>}
+   */
+  private void writeConnectionHeaderPreamble(OutputStream out) throws 
IOException {
+    out.write(connectionHeaderPreamble);
+    out.flush();
+  }
+
+  /**
+   * Write the connection header.
+   */
+  private void writeConnectionHeader() throws IOException {
+    this.out.write(connectionHeaderWithLength);
+    this.out.flush();
+  }
+
+  private void tracedWriteRequest(Call call) throws IOException {
+    try (TraceScope ignored = 
Trace.startSpan("RpcClientImpl.tracedWriteRequest", call.span)) {
+      writeRequest(call);
+    }
+  }
+
+  /**
+   * Initiates a call by sending the parameter to the remote server. Note: 
this is not called from
+   * the Connection thread, but by other threads.
+   * @see #readResponse()
+   */
+  private void writeRequest(Call call) throws IOException {
+    ByteBuffer cellBlock = 
this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec,
+      this.compressor, call.cells);
+    CellBlockMeta cellBlockMeta;
+    if (cellBlock != null) {
+      cellBlockMeta = 
CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
+    } else {
+      cellBlockMeta = null;
+    }
+    RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
+
+    setupIOstreams();
+
+    // Now we're going to write the call. We take the lock, then check that 
the connection
+    // is still valid, and, if so we do the write to the socket. If the write 
fails, we don't
+    // know where we stand, we have to close the connection.
+    if (Thread.interrupted()) {
+      throw new InterruptedIOException();
+    }
+
+    calls.put(call.id, call); // We put first as we don't want the connection 
to become idle.
+    // from here, we do not throw any exception to upper layer as the call has 
been tracked in the
+    // pending calls map.
+    try {
+      call.callStats.setRequestSizeBytes(write(this.out, requestHeader, 
call.param, cellBlock));
+    } catch (IOException e) {
+      closeConn(e);
+      return;
+    }
+    notifyAll();
+  }
+
+  /*
+   * Receive a response. Because only one receiver, so no synchronization on 
in.
+   */
+  private void readResponse() {
+    Call call = null;
+    boolean expectedCall = false;
+    try {
+      // See HBaseServer.Call.setResponse for where we write out the response.
+      // Total size of the response. Unused. But have to read it in anyways.
+      int totalSize = in.readInt();
+
+      // Read the header
+      ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
+      int id = responseHeader.getCallId();
+      call = calls.remove(id); // call.done have to be set before leaving this 
method
+      expectedCall = (call != null && !call.isDone());
+      if (!expectedCall) {
+        // So we got a response for which we have no corresponding 'call' here 
on the client-side.
+        // We probably timed out waiting, cleaned up all references, and now 
the server decides
+        // to return a response. There is nothing we can do w/ the response at 
this stage. Clean
+        // out the wire of the response so its out of the way and we can get 
other responses on
+        // this connection.
+        int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader);
+        int whatIsLeftToRead = totalSize - readSoFar;
+        IOUtils.skipFully(in, whatIsLeftToRead);
+        if (call != null) {
+          call.callStats.setResponseSizeBytes(totalSize);
+          call.callStats
+              .setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
call.callStats.getStartTime());
+        }
+        return;
+      }
+      if (responseHeader.hasException()) {
+        ExceptionResponse exceptionResponse = responseHeader.getException();
+        RemoteException re = createRemoteException(exceptionResponse);
+        call.setException(re);
+        call.callStats.setResponseSizeBytes(totalSize);
+        call.callStats
+            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
call.callStats.getStartTime());
+        if (isFatalConnectionException(exceptionResponse)) {
+          synchronized (this) {
+            closeConn(re);
+          }
+        }
+      } else {
+        Message value = null;
+        if (call.responseDefaultType != null) {
+          Builder builder = call.responseDefaultType.newBuilderForType();
+          ProtobufUtil.mergeDelimitedFrom(builder, in);
+          value = builder.build();
+        }
+        CellScanner cellBlockScanner = null;
+        if (responseHeader.hasCellBlockMeta()) {
+          int size = responseHeader.getCellBlockMeta().getLength();
+          byte[] cellBlock = new byte[size];
+          IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
+          cellBlockScanner = 
this.rpcClient.cellBlockBuilder.createCellScanner(this.codec,
+            this.compressor, cellBlock);
+        }
+        call.setResponse(value, cellBlockScanner);
+        call.callStats.setResponseSizeBytes(totalSize);
+        call.callStats
+            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - 
call.callStats.getStartTime());
+      }
+    } catch (IOException e) {
+      if (expectedCall) {
+        call.setException(e);
+      }
+      if (e instanceof SocketTimeoutException) {
+        // Clean up open calls but don't treat this as a fatal condition,
+        // since we expect certain responses to not make it by the specified
+        // {@link ConnectionId#rpcTimeout}.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("ignored", e);
+        }
+      } else {
+        synchronized (this) {
+          closeConn(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected synchronized void callTimeout(Call call) {
+    // call sender
+    calls.remove(call.id);
+  }
+
+  // just close socket input and output.
+  private void closeSocket() {
+    IOUtils.closeStream(out);
+    IOUtils.closeStream(in);
+    IOUtils.closeSocket(socket);
+    out = null;
+    in = null;
+    socket = null;
+  }
+
+  // close socket, reader, and clean up all pending calls.
+  private void closeConn(IOException e) {
+    if (thread == null) {
+      return;
+    }
+    thread.interrupt();
+    thread = null;
+    closeSocket();
+    if (callSender != null) {
+      callSender.cleanup(e);
+    }
+    for (Call call : calls.values()) {
+      call.setException(e);
+    }
+    calls.clear();
+  }
+
+  // release all resources, the connection will not be used any more.
+  @Override
+  public synchronized void shutdown() {
+    closed = true;
+    if (callSender != null) {
+      callSender.interrupt();
+    }
+    closeConn(new IOException("connection to " + remoteId.address + " 
closed"));
+  }
+
+  @Override
+  public synchronized void sendRequest(final Call call, HBaseRpcController 
pcrc)
+      throws IOException {
+    pcrc.notifyOnCancel(new RpcCallback<Object>() {
+
+      @Override
+      public void run(Object parameter) {
+        setCancelled(call);
+        synchronized (BlockingRpcConnection.this) {
+          if (callSender != null) {
+            callSender.remove(call);
+          } else {
+            calls.remove(call.id);
+          }
+        }
+      }
+    }, new CancellationCallback() {
+
+      @Override
+      public void run(boolean cancelled) throws IOException {
+        if (cancelled) {
+          setCancelled(call);
+          return;
+        }
+        scheduleTimeoutTask(call);
+        if (callSender != null) {
+          callSender.sendCall(call);
+        } else {
+          tracedWriteRequest(call);
+        }
+      }
+    });
+  }
+
+  @Override
+  public synchronized boolean isActive() {
+    return thread != null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
new file mode 100644
index 0000000..573ddd5
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * We will expose the connection to upper layer before initialized, so we need 
to buffer the calls
+ * passed in and write them out once the connection is established.
+ */
+@InterfaceAudience.Private
+class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
+
+  private enum BufferCallAction {
+    FLUSH, FAIL
+  }
+
+  public static final class BufferCallEvent {
+
+    public final BufferCallAction action;
+
+    public final IOException error;
+
+    private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction 
action,
+        IOException error) {
+      this.action = action;
+      this.error = error;
+    }
+
+    public static BufferCallBeforeInitHandler.BufferCallEvent success() {
+      return SUCCESS_EVENT;
+    }
+
+    public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException 
error) {
+      return new BufferCallEvent(BufferCallAction.FAIL, error);
+    }
+  }
+
+  private static final BufferCallEvent SUCCESS_EVENT = new 
BufferCallEvent(BufferCallAction.FLUSH,
+      null);
+
+  private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>();
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) {
+    if (msg instanceof Call) {
+      Call call = (Call) msg;
+      id2Call.put(call.id, call);
+    } else {
+      ctx.write(msg, promise);
+    }
+  }
+
+  @Override
+  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws 
Exception {
+    if (evt instanceof BufferCallEvent) {
+      BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) 
evt;
+      switch (bcEvt.action) {
+        case FLUSH:
+          for (Call call : id2Call.values()) {
+            ctx.write(call);
+          }
+          break;
+        case FAIL:
+          for (Call call : id2Call.values()) {
+            call.setException(bcEvt.error);
+          }
+          break;
+      }
+      ctx.flush();
+      ctx.pipeline().remove(this);
+    } else if (evt instanceof CallEvent) {
+      // just remove the call for now until we add other call event other than 
timeout and cancel.
+      id2Call.remove(((CallEvent) evt).call.id);
+    } else {
+      ctx.fireUserEventTriggered(evt);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index 73bc0e2..a6203d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.ipc;
 
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+
+import io.netty.util.Timeout;
 
 import java.io.IOException;
 
@@ -27,29 +30,39 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
 
 /** A call waiting for a value. */
 @InterfaceAudience.Private
-public class Call {
-  final int id;                                 // call id
-  final Message param;                          // rpc request method param 
object
+class Call {
+  final int id; // call id
+  final Message param; // rpc request method param object
   /**
-   * Optionally has cells when making call.  Optionally has cells set on 
response.  Used
-   * passing cells to the rpc and receiving the response.
+   * Optionally has cells when making call. Optionally has cells set on 
response. Used passing cells
+   * to the rpc and receiving the response.
    */
   CellScanner cells;
-  Message response;                             // value, null if error
-  // The return type.  Used to create shell into which we deserialize the 
response if any.
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"IS2_INCONSISTENT_SYNC",
+      justification = "Direct access is only allowed after done")
+  Message response; // value, null if error
+  // The return type. Used to create shell into which we deserialize the 
response if any.
   Message responseDefaultType;
-  IOException error;                            // exception, null if value
-  volatile boolean done;                                 // true when call is 
done
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"IS2_INCONSISTENT_SYNC",
+    justification = "Direct access is only allowed after done")
+  IOException error; // exception, null if value
+  private boolean done; // true when call is done
   final Descriptors.MethodDescriptor md;
   final int timeout; // timeout in millisecond for this call; 0 means infinite.
+  final int priority;
   final MetricsConnection.CallStats callStats;
+  final RpcCallback<Call> callback;
+  final Span span;
+  Timeout timeoutTask;
 
   protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
-      final CellScanner cells, final Message responseDefaultType, int timeout,
-      MetricsConnection.CallStats callStats) {
+      final CellScanner cells, final Message responseDefaultType, int timeout, 
int priority,
+      RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
     this.param = param;
     this.md = md;
     this.cells = cells;
@@ -58,73 +71,74 @@ public class Call {
     this.responseDefaultType = responseDefaultType;
     this.id = id;
     this.timeout = timeout;
+    this.priority = priority;
+    this.callback = callback;
+    this.span = Trace.currentSpan();
+  }
+
+  @Override
+  public String toString() {
+    return "callId: " + this.id + " methodName: " + this.md.getName() + " 
param {"
+        + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : 
"") + "}";
   }
 
   /**
-   * Check if the call did timeout. Set an exception (includes a notify) if 
it's the case.
-   * @return true if the call is on timeout, false otherwise.
+   * called from timeoutTask, prevent self cancel
    */
-  public boolean checkAndSetTimeout() {
-    if (timeout == 0){
-      return false;
-    }
-
-    long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime();
-    if (waitTime >= timeout) {
-      IOException ie = new CallTimeoutException("Call id=" + id +
-          ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " 
expired.");
-      setException(ie); // includes a notify
-      return true;
-    } else {
-      return false;
+  public void setTimeout(IOException error) {
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      this.done = true;
+      this.error = error;
     }
+    callback.run(this);
   }
 
-  public int remainingTime() {
-    if (timeout == 0) {
-      return Integer.MAX_VALUE;
+  private void callComplete() {
+    if (timeoutTask != null) {
+      timeoutTask.cancel();
     }
-
-    int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - 
getStartTime());
-    return remaining > 0 ? remaining : 0;
-  }
-
-  @Override
-  public String toString() {
-    return "callId: " + this.id + " methodName: " + this.md.getName() + " 
param {" +
-      (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + 
"}";
+    callback.run(this);
   }
 
-  /** Indicate when the call is complete and the
-   * value or error are available.  Notifies by default.  */
-  protected synchronized void callComplete() {
-    this.done = true;
-    notify();                                 // notify caller
-  }
-
-  /** Set the exception when there is an error.
-   * Notify the caller the call is done.
-   *
+  /**
+   * Set the exception when there is an error. Notify the caller the call is 
done.
    * @param error exception thrown by the call; either local or remote
    */
   public void setException(IOException error) {
-    this.error = error;
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      this.done = true;
+      this.error = error;
+    }
     callComplete();
   }
 
   /**
-   * Set the return value when there is no error.
-   * Notify the caller the call is done.
-   *
+   * Set the return value when there is no error. Notify the caller the call 
is done.
    * @param response return value of the call.
    * @param cells Can be null
    */
   public void setResponse(Message response, final CellScanner cells) {
-    this.response = response;
-    this.cells = cells;
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      this.done = true;
+      this.response = response;
+      this.cells = cells;
+    }
     callComplete();
   }
 
+  public synchronized boolean isDone() {
+    return done;
+  }
+
   public long getStartTime() {
     return this.callStats.getStartTime();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
new file mode 100644
index 0000000..a6777c0
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Client side call cancelled.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CallCancelledException extends HBaseIOException {
+
+  private static final long serialVersionUID = 309775809470318208L;
+
+  public CallCancelledException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
new file mode 100644
index 0000000..1c2ea32
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to tell netty handler the call is cancelled, timeout...
+ */
+@InterfaceAudience.Private
+class CallEvent {
+
+  public enum Type {
+    TIMEOUT, CANCELLED
+  }
+
+  final Type type;
+
+  final Call call;
+
+  CallEvent(Type type, Call call) {
+    this.type = type;
+    this.call = call;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
index 1e31f72..db8c34a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallTimeoutException.java
@@ -17,8 +17,7 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import java.io.IOException;
-
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
@@ -28,7 +27,8 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 @SuppressWarnings("serial")
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class CallTimeoutException extends IOException {
+public class CallTimeoutException extends HBaseIOException {
+
   public CallTimeoutException(final String msg) {
     super(msg);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
index 072a490..fb2cafa 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufOutputStream;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.BufferOverflowException;
@@ -46,12 +50,13 @@ import org.apache.hadoop.io.compress.Decompressor;
  * Helper class for building cell block.
  */
 @InterfaceAudience.Private
-public class CellBlockBuilder {
+class CellBlockBuilder {
 
   // LOG is being used in TestCellBlockBuilder
   static final Log LOG = LogFactory.getLog(CellBlockBuilder.class);
 
   private final Configuration conf;
+
   /**
    * How much we think the decompressor will expand the original compressed 
content.
    */
@@ -59,7 +64,7 @@ public class CellBlockBuilder {
 
   private final int cellBlockBuildingInitialBufferSize;
 
-  public CellBlockBuilder(final Configuration conf) {
+  public CellBlockBuilder(Configuration conf) {
     this.conf = conf;
     this.cellBlockDecompressionMultiplier = conf
         .getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
@@ -70,44 +75,104 @@ public class CellBlockBuilder {
         .align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 
16 * 1024));
   }
 
+  private interface OutputStreamSupplier {
+
+    OutputStream get(int expectedSize);
+
+    int size();
+  }
+
+  private static final class ByteBufferOutputStreamSupplier implements 
OutputStreamSupplier {
+
+    private ByteBufferOutputStream baos;
+
+    @Override
+    public OutputStream get(int expectedSize) {
+      baos = new ByteBufferOutputStream(expectedSize);
+      return baos;
+    }
+
+    @Override
+    public int size() {
+      return baos.size();
+    }
+  }
+
   /**
    * Puts CellScanner Cells into a cell block using passed in 
<code>codec</code> and/or
    * <code>compressor</code>.
-   * @param codec to use for encoding
-   * @param compressor to use for encoding
-   * @param cellScanner to encode
+   * @param codec
+   * @param compressor
+   * @param cellScanner
    * @return Null or byte buffer filled with a cellblock filled with passed-in 
Cells encoded using
    *         passed in <code>codec</code> and/or <code>compressor</code>; the 
returned buffer has
    *         been flipped and is ready for reading. Use limit to find total 
size.
-   * @throws IOException if encoding the cells fail
+   * @throws IOException
    */
   public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec 
compressor,
       final CellScanner cellScanner) throws IOException {
-    if (cellScanner == null) {
+    ByteBufferOutputStreamSupplier supplier = new 
ByteBufferOutputStreamSupplier();
+    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
+      ByteBuffer bb = supplier.baos.getByteBuffer();
+      // If no cells, don't mess around. Just return null (could be a bunch of 
existence checking
+      // gets or something -- stuff that does not return a cell).
+      return bb.hasRemaining() ? bb : null;
+    } else {
       return null;
     }
+  }
+
+  private static final class ByteBufOutputStreamSupplier implements 
OutputStreamSupplier {
+
+    private final ByteBufAllocator alloc;
+
+    private ByteBuf buf;
+
+    public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) {
+      this.alloc = alloc;
+    }
+
+    @Override
+    public OutputStream get(int expectedSize) {
+      buf = alloc.buffer(expectedSize);
+      return new ByteBufOutputStream(buf);
+    }
+
+    @Override
+    public int size() {
+      return buf.writerIndex();
+    }
+  }
+
+  public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, 
CellScanner cellScanner,
+      ByteBufAllocator alloc) throws IOException {
+    ByteBufOutputStreamSupplier supplier = new 
ByteBufOutputStreamSupplier(alloc);
+    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
+      return supplier.buf;
+    } else {
+      return null;
+    }
+  }
+
+  private boolean buildCellBlock(final Codec codec, final CompressionCodec 
compressor,
+      final CellScanner cellScanner, OutputStreamSupplier supplier) throws 
IOException {
+    if (cellScanner == null) {
+      return false;
+    }
     if (codec == null) {
       throw new CellScannerButNoCodecException();
     }
-    int bufferSize = this.cellBlockBuildingInitialBufferSize;
-    ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
-    encodeCellsTo(baos, cellScanner, codec, compressor);
-    if (LOG.isTraceEnabled()) {
-      if (bufferSize < baos.size()) {
-        LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " 
+ baos.size()
-            + "; up hbase.ipc.cellblock.building.initial.buffersize?");
-      }
+    int bufferSize = cellBlockBuildingInitialBufferSize;
+    encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor);
+    if (LOG.isTraceEnabled() && bufferSize < supplier.size()) {
+      LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + 
supplier.size()
+          + "; up hbase.ipc.cellblock.building.initial.buffersize?");
     }
-    ByteBuffer bb = baos.getByteBuffer();
-    // If no cells, don't mess around. Just return null (could be a bunch of 
existence checking
-    // gets or something -- stuff that does not return a cell).
-    if (!bb.hasRemaining()) return null;
-    return bb;
+    return true;
   }
 
-  private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner 
cellScanner, Codec codec,
+  private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec 
codec,
       CompressionCodec compressor) throws IOException {
-    OutputStream os = bbos;
     Compressor poolCompressor = null;
     try {
       if (compressor != null) {
@@ -122,7 +187,7 @@ public class CellBlockBuilder {
         encoder.write(cellScanner.current());
       }
       encoder.flush();
-    } catch (BufferOverflowException e) {
+    } catch (BufferOverflowException | IndexOutOfBoundsException e) {
       throw new DoNotRetryIOException(e);
     } finally {
       os.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
index 08f8171..1b837d8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionId.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.security.User;
  * to servers are uniquely identified by &lt;remoteAddress, ticket, 
serviceName&gt;
  */
 @InterfaceAudience.Private
-public class ConnectionId {
+class ConnectionId {
   private static final int PRIME = 16777619;
   final User ticket;
   final String serviceName;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
new file mode 100644
index 0000000..c7c0f32
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * The default netty event loop config
+ */
+@InterfaceAudience.Private
+class DefaultNettyEventLoopConfig {
+
+  public static final Pair<EventLoopGroup, Class<? extends Channel>> 
GROUP_AND_CHANNEL_CLASS = Pair
+      .<EventLoopGroup, Class<? extends Channel>> newPair(new 
NioEventLoopGroup(),
+        NioSocketChannel.class);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
new file mode 100644
index 0000000..721148b
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Indicate that the rpc server tells client to fallback to simple auth but 
client is disabled to do
+ * so.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FallbackDisallowedException extends HBaseIOException {
+
+  private static final long serialVersionUID = -6942845066279358253L;
+
+  public FallbackDisallowedException() {
+    super("Server asks us to fall back to SIMPLE auth, "
+        + "but this client is configured to only allow secure connections.");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
deleted file mode 100644
index 09dda09..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IOExceptionConverter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Converts exceptions to other exceptions
- */
-@InterfaceAudience.Private
-public interface IOExceptionConverter {
-  /**
-   * Converts given IOException
-   * @param e exception to convert
-   * @return converted IOException
-   */
-  IOException convert(IOException e);
-}

Reply via email to