This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 3f54408ec8c HBASE-28417 TestBlockingIPC.testBadPreambleHeader 
sometimes fails with broken pipe instead of bad auth (#5740)
3f54408ec8c is described below

commit 3f54408ec8c083d84e6504c91e898009f7776c3b
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Wed Mar 6 16:08:36 2024 +0800

    HBASE-28417 TestBlockingIPC.testBadPreambleHeader sometimes fails with 
broken pipe instead of bad auth (#5740)
    
    Also change the IPC related tests to test different combinations of rpc 
server&client, for example, NettyRpcClient and SimpleRpcServer
    
    Signed-off-by: Nick Dimiduk <ndimi...@apache.org>
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
    (cherry picked from commit 2306820df8b41d9af5227465ee2cf9e18b8f0b5c)
---
 .../hadoop/hbase/ipc/NettyRpcConnection.java       |  2 +-
 .../apache/hadoop/hbase/ipc/AbstractTestIPC.java   | 86 ++++++++++++++++++----
 .../apache/hadoop/hbase/ipc/TestBlockingIPC.java   | 55 +++-----------
 .../org/apache/hadoop/hbase/ipc/TestNettyIPC.java  | 40 ++++------
 .../apache/hadoop/hbase/ipc/TestNettyTlsIPC.java   | 24 +++---
 5 files changed, 112 insertions(+), 95 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 1618709fa9b..85f7c0a3e61 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -290,7 +290,7 @@ class NettyRpcConnection extends RpcConnection {
     });
   }
 
-  private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) 
throws IOException {
+  private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) {
     assert eventLoop.inEventLoop();
     PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
       RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index e4427c1690c..0f0c22baf9f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -56,6 +56,7 @@ import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
 import io.opentelemetry.sdk.trace.data.SpanData;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -73,6 +74,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -80,8 +82,10 @@ import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.hamcrest.Matcher;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,14 +118,12 @@ public abstract class AbstractTestIPC {
   private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, 
CELL_BYTES, CELL_BYTES);
 
   protected static final Configuration CONF = HBaseConfiguration.create();
-  static {
-    // Set the default to be the old SimpleRpcServer. Subclasses test it and 
netty.
-    CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, 
SimpleRpcServer.class.getName());
-  }
 
-  protected abstract RpcServer createRpcServer(Server server, String name,
+  protected RpcServer createRpcServer(Server server, String name,
     List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, 
Configuration conf,
-    RpcScheduler scheduler) throws IOException;
+    RpcScheduler scheduler) throws IOException {
+    return RpcServerFactory.createRpcServer(server, name, services, 
bindAddress, conf, scheduler);
+  }
 
   private RpcServer createRpcServer(String name, 
List<BlockingServiceAndInterface> services,
     InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) 
throws IOException {
@@ -133,6 +135,14 @@ public abstract class AbstractTestIPC {
   @Rule
   public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
 
+  @Parameter(0)
+  public Class<? extends RpcServer> rpcServerImpl;
+
+  @Before
+  public void setUpBeforeTest() {
+    CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, 
rpcServerImpl, RpcServer.class);
+  }
+
   /**
    * Ensure we do not HAVE TO HAVE a codec.
    */
@@ -348,9 +358,43 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  protected abstract RpcServer createTestFailingRpcServer(final String name,
+  private static class FailingSimpleRpcServer extends SimpleRpcServer {
+
+    FailingSimpleRpcServer(Server server, String name,
+      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
+      Configuration conf, RpcScheduler scheduler) throws IOException {
+      super(server, name, services, bindAddress, conf, scheduler, true);
+    }
+
+    final class FailingConnection extends SimpleServerRpcConnection {
+      private FailingConnection(FailingSimpleRpcServer rpcServer, 
SocketChannel channel,
+        long lastContact) {
+        super(rpcServer, channel, lastContact);
+      }
+
+      @Override
+      public void processRequest(ByteBuff buf) throws IOException, 
InterruptedException {
+        // this will throw exception after the connection header is read, and 
an RPC is sent
+        // from client
+        throw new DoNotRetryIOException("Failing for test");
+      }
+    }
+
+    @Override
+    protected SimpleServerRpcConnection getConnection(SocketChannel channel, 
long time) {
+      return new FailingConnection(this, channel, time);
+    }
+  }
+
+  protected RpcServer createTestFailingRpcServer(final String name,
     final List<BlockingServiceAndInterface> services, final InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException;
+    Configuration conf, RpcScheduler scheduler) throws IOException {
+    if (rpcServerImpl.equals(NettyRpcServer.class)) {
+      return new FailingNettyRpcServer(null, name, services, bindAddress, 
conf, scheduler);
+    } else {
+      return new FailingSimpleRpcServer(null, name, services, bindAddress, 
conf, scheduler);
+    }
+  }
 
   /** Tests that the connection closing is handled by the client with 
outstanding RPC calls */
   @Test
@@ -570,19 +614,33 @@ public abstract class AbstractTestIPC {
 
   protected abstract AbstractRpcClient<?> createBadAuthRpcClient(Configuration 
conf);
 
+  private IOException doBadPreableHeaderCall(BlockingInterface stub) {
+    ServiceException se = assertThrows(ServiceException.class,
+      () -> stub.echo(null, 
EchoRequestProto.newBuilder().setMessage("hello").build()));
+    return ProtobufUtil.handleRemoteException(se);
+  }
+
   @Test
-  public void testBadPreambleHeader() throws IOException, ServiceException {
+  public void testBadPreambleHeader() throws Exception {
     Configuration clientConf = new Configuration(CONF);
     RpcServer rpcServer = createRpcServer("testRpcServer", 
Collections.emptyList(),
       new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 
1));
     try (AbstractRpcClient<?> client = createBadAuthRpcClient(clientConf)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, 
rpcServer.getListenerAddress());
-      ServiceException se = assertThrows(ServiceException.class,
-        () -> stub.echo(null, 
EchoRequestProto.newBuilder().setMessage("hello").build()));
-      IOException ioe = ProtobufUtil.handleRemoteException(se);
-      assertThat(ioe, instanceOf(BadAuthException.class));
-      assertThat(ioe.getMessage(), containsString("authName=unknown"));
+      BadAuthException error = null;
+      // for SimpleRpcServer, it is possible that we get a broken pipe before 
getting the
+      // BadAuthException, so we add some retries here, see HBASE-28417
+      for (int i = 0; i < 10; i++) {
+        IOException ioe = doBadPreableHeaderCall(stub);
+        if (ioe instanceof BadAuthException) {
+          error = (BadAuthException) ioe;
+          break;
+        }
+        Thread.sleep(100);
+      }
+      assertNotNull("Can not get expected BadAuthException", error);
+      assertThat(error.getMessage(), containsString("authName=unknown"));
     } finally {
       rpcServer.stop();
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
index e60cc879fd4..24177f28c40 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
@@ -18,20 +18,20 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.junit.ClassRule;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 @Category({ RPCTests.class, MediumTests.class })
 public class TestBlockingIPC extends AbstractTestIPC {
 
@@ -39,11 +39,10 @@ public class TestBlockingIPC extends AbstractTestIPC {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestBlockingIPC.class);
 
-  @Override
-  protected RpcServer createRpcServer(Server server, String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return RpcServerFactory.createRpcServer(server, name, services, 
bindAddress, conf, scheduler);
+  @Parameters(name = "{index}: rpcServerImpl={0}")
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[] { SimpleRpcServer.class },
+      new Object[] { NettyRpcServer.class });
   }
 
   @Override
@@ -73,41 +72,6 @@ public class TestBlockingIPC extends AbstractTestIPC {
     };
   }
 
-  private static class TestFailingRpcServer extends SimpleRpcServer {
-
-    TestFailingRpcServer(Server server, String name,
-      List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-      Configuration conf, RpcScheduler scheduler) throws IOException {
-      super(server, name, services, bindAddress, conf, scheduler, true);
-    }
-
-    final class FailingConnection extends SimpleServerRpcConnection {
-      private FailingConnection(TestFailingRpcServer rpcServer, SocketChannel 
channel,
-        long lastContact) {
-        super(rpcServer, channel, lastContact);
-      }
-
-      @Override
-      public void processRequest(ByteBuff buf) throws IOException, 
InterruptedException {
-        // this will throw exception after the connection header is read, and 
an RPC is sent
-        // from client
-        throw new DoNotRetryIOException("Failing for test");
-      }
-    }
-
-    @Override
-    protected SimpleServerRpcConnection getConnection(SocketChannel channel, 
long time) {
-      return new FailingConnection(this, channel, time);
-    }
-  }
-
-  @Override
-  protected RpcServer createTestFailingRpcServer(String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return new TestFailingRpcServer(null, name, services, bindAddress, conf, 
scheduler);
-  }
-
   @Override
   protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
     return new BlockingRpcClient(conf) {
@@ -124,7 +88,6 @@ public class TestBlockingIPC extends AbstractTestIPC {
           }
         };
       }
-
     };
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
index a1b60e2cfa4..f2366a20fd2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
@@ -18,13 +18,10 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
@@ -51,18 +48,27 @@ public class TestNettyIPC extends AbstractTestIPC {
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestNettyIPC.class);
 
-  @Parameters(name = "{index}: EventLoop={0}")
-  public static Collection<Object[]> parameters() {
-    List<Object[]> params = new ArrayList<>();
-    params.add(new Object[] { "nio" });
-    params.add(new Object[] { "perClientNio" });
+  private static List<String> getEventLoopTypes() {
+    List<String> types = new ArrayList<>();
+    types.add("nio");
+    types.add("perClientNio");
     if (JVM.isLinux() && JVM.isAmd64()) {
-      params.add(new Object[] { "epoll" });
+      types.add("epoll");
+    }
+    return types;
+  }
+
+  @Parameters(name = "{index}: rpcServerImpl={0}, EventLoop={1}")
+  public static List<Object[]> parameters() {
+    List<Object[]> params = new ArrayList<>();
+    for (String eventLoopType : getEventLoopTypes()) {
+      params.add(new Object[] { SimpleRpcServer.class, eventLoopType });
+      params.add(new Object[] { NettyRpcServer.class, eventLoopType });
     }
     return params;
   }
 
-  @Parameter
+  @Parameter(1)
   public String eventLoopType;
 
   private static NioEventLoopGroup NIO;
@@ -103,13 +109,6 @@ public class TestNettyIPC extends AbstractTestIPC {
     }
   }
 
-  @Override
-  protected RpcServer createRpcServer(Server server, String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return new NettyRpcServer(server, name, services, bindAddress, conf, 
scheduler, true);
-  }
-
   @Override
   protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
     setConf(conf);
@@ -141,13 +140,6 @@ public class TestNettyIPC extends AbstractTestIPC {
     };
   }
 
-  @Override
-  protected RpcServer createTestFailingRpcServer(String name,
-    List<RpcServer.BlockingServiceAndInterface> services, InetSocketAddress 
bindAddress,
-    Configuration conf, RpcScheduler scheduler) throws IOException {
-    return new FailingNettyRpcServer(null, name, services, bindAddress, conf, 
scheduler);
-  }
-
   @Override
   protected AbstractRpcClient<?> createBadAuthRpcClient(Configuration conf) {
     return new NettyRpcClient(conf) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java
index 1cbf6be26c6..00a6b23336a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java
@@ -67,37 +67,41 @@ public class TestNettyTlsIPC extends AbstractTestIPC {
 
   private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG;
 
-  @Parameterized.Parameter(0)
+  @Parameterized.Parameter(1)
   public X509KeyType caKeyType;
 
-  @Parameterized.Parameter(1)
+  @Parameterized.Parameter(2)
   public X509KeyType certKeyType;
 
-  @Parameterized.Parameter(2)
+  @Parameterized.Parameter(3)
   public char[] keyPassword;
 
-  @Parameterized.Parameter(3)
+  @Parameterized.Parameter(4)
   public boolean acceptPlainText;
 
-  @Parameterized.Parameter(4)
+  @Parameterized.Parameter(5)
   public boolean clientTlsEnabled;
 
   private X509TestContext x509TestContext;
 
+  // only netty rpc server supports TLS, so here we will only test 
NettyRpcServer
   @Parameterized.Parameters(
-      name = "{index}: caKeyType={0}, certKeyType={1}, keyPassword={2}, 
acceptPlainText={3},"
-        + " clientTlsEnabled={4}")
+      name = "{index}: rpcServerImpl={0}, caKeyType={1}, certKeyType={2}, 
keyPassword={3},"
+        + " acceptPlainText={4}, clientTlsEnabled={5}")
   public static List<Object[]> data() {
     List<Object[]> params = new ArrayList<>();
     for (X509KeyType caKeyType : X509KeyType.values()) {
       for (X509KeyType certKeyType : X509KeyType.values()) {
         for (char[] keyPassword : new char[][] { "".toCharArray(), 
"pa$$w0rd".toCharArray() }) {
           // do not accept plain text
-          params.add(new Object[] { caKeyType, certKeyType, keyPassword, 
false, true });
+          params.add(new Object[] { NettyRpcServer.class, caKeyType, 
certKeyType, keyPassword,
+            false, true });
           // support plain text and client enables tls
-          params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, 
true });
+          params.add(
+            new Object[] { NettyRpcServer.class, caKeyType, certKeyType, 
keyPassword, true, true });
           // support plain text and client disables tls
-          params.add(new Object[] { caKeyType, certKeyType, keyPassword, true, 
false });
+          params.add(new Object[] { NettyRpcServer.class, caKeyType, 
certKeyType, keyPassword, true,
+            false });
         }
       }
     }

Reply via email to