Repository: hbase
Updated Branches:
  refs/heads/master 01c0448cc -> ccd8888b4


Revert "HBASE-15278 AsyncRPCClient hangs if Connection closes before RPC call 
response"

This reverts commit 01c0448ccd943186ba8045074a59e53f8f08c364.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ccd8888b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ccd8888b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ccd8888b

Branch: refs/heads/master
Commit: ccd8888b4b5cbb5580633bc204c82edb25707287
Parents: 01c0448
Author: chenheng <chenh...@apache.org>
Authored: Sat Apr 30 11:37:05 2016 +0800
Committer: chenheng <chenh...@apache.org>
Committed: Sat Apr 30 11:37:05 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |  6 --
 .../hbase/ipc/AsyncServerResponseHandler.java   |  8 ++-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       | 69 +-------------------
 3 files changed, 9 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ccd8888b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index ef3240c..53eb824 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -210,12 +210,6 @@ public class AsyncRpcChannel {
     ch.pipeline().addLast("frameDecoder",
       new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
     ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-    ch.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        close(null);
-      }
-    });
     try {
       writeChannelHeader(ch).addListener(new 
GenericFutureListener<ChannelFuture>() {
         @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccd8888b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index 5c604a4..e0c7586 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -103,6 +105,11 @@ public class AsyncServerResponseHandler extends 
SimpleChannelInboundHandler<Byte
     channel.close(cause);
   }
 
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    channel.close(new IOException("connection closed"));
+  }
+
   /**
    * @param e Proto exception
    * @return RemoteException made from passed <code>e</code>
@@ -116,5 +123,4 @@ public class AsyncServerResponseHandler extends 
SimpleChannelInboundHandler<Byte
             e.getPort(), doNotRetry)
         : new RemoteWithExtrasException(innerExceptionClassName, 
e.getStackTrace(), doNotRetry);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ccd8888b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index bfbfa8c..69c8fe2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -30,8 +30,6 @@ import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -41,7 +39,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -161,39 +158,6 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  static class TestFailingRpcServer extends TestRpcServer {
-
-    TestFailingRpcServer() throws IOException {
-      this(new FifoRpcScheduler(CONF, 1), CONF);
-    }
-
-    TestFailingRpcServer(Configuration conf) throws IOException {
-      this(new FifoRpcScheduler(conf, 1), conf);
-    }
-
-    TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws 
IOException {
-      super(scheduler, conf);
-    }
-
-    class FailingConnection extends Connection {
-      public FailingConnection(SocketChannel channel, long lastContact) {
-        super(channel, lastContact);
-      }
-      @Override
-      protected void processRequest(ByteBuffer buf) throws IOException, 
InterruptedException {
-        // this will throw exception after the connection header is read, and 
an RPC is sent
-        // from client
-        throw new DoNotRetryIOException("Failing for test");
-      }
-    }
-
-    @Override
-    protected Connection getConnection(SocketChannel channel, long time) {
-      return new FailingConnection(channel, time);
-    }
-
-  }
-
   protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration 
conf);
 
   /**
@@ -332,8 +296,8 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  /** Tests that RPC max request size is respected from the server side */
-  @Test (timeout = 30000)
+  /** Tests that the rpc scheduler is called when requests arrive. */
+  @Test
   public void testRpcMaxRequestSize() throws IOException, InterruptedException 
{
     Configuration conf = new Configuration(CONF);
     conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100);
@@ -363,35 +327,6 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  /** Tests that the connection closing is handled by the client with 
outstanding RPC calls */
-  @Test (timeout = 30000)
-  public void testConnectionCloseWithOutstandingRPCs() throws IOException, 
InterruptedException {
-    Configuration conf = new Configuration(CONF);
-
-    RpcServer rpcServer = new TestFailingRpcServer(conf);
-    AbstractRpcClient client = createRpcClient(conf);
-    try {
-      rpcServer.start();
-      MethodDescriptor md = 
SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = 
EchoRequestProto.newBuilder().setMessage("hello").build();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      try {
-        client.call(new PayloadCarryingRpcController(
-          CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, 
param,
-          md.getOutputType().toProto(), User.getCurrent(), address,
-          new MetricsConnection.CallStats());
-        fail("RPC should have failed because server closed connection");
-      } catch(IOException ex) {
-        // pass
-      }
-    } finally {
-      rpcServer.stop();
-    }
-  }
-
   /**
    * Instance of RpcServer that echoes client hostAddress back to client
    */

Reply via email to