This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new ee068ba8470 HBASE-28321 RpcConnectionRegistry is broken when security 
is enabled and we use different principal for master and region server (#5688)
ee068ba8470 is described below

commit ee068ba8470dc7303a18f753dd8a6db66e5ac44c
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Sun Feb 25 20:50:13 2024 +0800

    HBASE-28321 RpcConnectionRegistry is broken when security is enabled and we 
use different principal for master and region server (#5688)
    
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
    (cherry picked from commit c4a02f7fcd1d74385b87bb761d25b118ce080119)
---
 .../hadoop/hbase/client/ConnectionUtils.java       |   2 +-
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java |   7 +
 .../hadoop/hbase/ipc/BlockingRpcConnection.java    | 104 ++++++--
 .../java/org/apache/hadoop/hbase/ipc/Call.java     |   8 +-
 .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java  |   5 +
 .../hadoop/hbase/ipc/NettyRpcConnection.java       | 127 ++++++++--
 .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java    |   2 +-
 .../hadoop/hbase/ipc/PreambleCallHandler.java      |  95 +++++++
 .../org/apache/hadoop/hbase/ipc/RpcClient.java     |   2 +
 .../org/apache/hadoop/hbase/ipc/RpcConnection.java | 137 +++++++++-
 .../hbase/ipc/SecurityNotEnabledException.java     |  34 +++
 .../hbase/security/AbstractHBaseSaslRpcClient.java |  36 +--
 .../hadoop/hbase/security/HBaseSaslRpcClient.java  |   8 +-
 .../hbase/security/NettyHBaseSaslRpcClient.java    |   4 +-
 .../security/NettyHBaseSaslRpcClientHandler.java   |   4 +-
 .../apache/hadoop/hbase/security/SecurityInfo.java |  30 ++-
 .../DigestSaslClientAuthenticationProvider.java    |   7 +-
 .../GssSaslClientAuthenticationProvider.java       |  45 +---
 .../provider/SaslClientAuthenticationProvider.java |  31 ++-
 .../SimpleSaslClientAuthenticationProvider.java    |   3 +-
 .../hbase/security/TestHBaseSaslRpcClient.java     |  30 +--
 .../ShadeSaslClientAuthenticationProvider.java     |   7 +-
 .../src/main/protobuf/rpc/RPC.proto                |   4 +
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |   7 +-
 .../hadoop/hbase/ipc/ServerRpcConnection.java      |  32 ++-
 .../hbase/ipc/SimpleServerRpcConnection.java       |   2 +-
 .../hbase/ipc/TestMultipleServerPrincipalsIPC.java | 277 +++++++++++++++++++++
 .../hbase/ipc/TestRpcSkipInitialSaslHandshake.java |   6 +-
 .../hbase/ipc/TestSecurityRpcSentBytesMetrics.java |   5 +-
 .../hbase/security/AbstractTestSecureIPC.java      |  12 +-
 ...stMultipleServerPrincipalsFallbackToSimple.java | 189 ++++++++++++++
 .../security/TestSaslTlsIPCRejectPlainText.java    |   5 +-
 .../CustomSaslAuthenticationProviderTestBase.java  |   3 +-
 33 files changed, 1084 insertions(+), 186 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index d073fef929f..84acc6e4d39 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -665,7 +665,7 @@ public final class ConnectionUtils {
     }
   }
 
-  static boolean isUnexpectedPreambleHeaderException(IOException e) {
+  public static boolean isUnexpectedPreambleHeaderException(IOException e) {
     if (!(e instanceof RemoteException)) {
       return false;
     }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 7972cc08acd..3742eb8118a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
 import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.StatusCode;
 import io.opentelemetry.context.Scope;
@@ -542,6 +543,12 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
     return new RpcChannelImplementation(this, createAddr(sn), user, 
rpcTimeout);
   }
 
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  PoolMap<ConnectionId, T> getConnections() {
+    return connections;
+  }
+
   private static class AbstractRpcChannel {
 
     protected final Address addr;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index 3f1418aa984..4b3d2de466b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -37,12 +37,14 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayDeque;
 import java.util.Locale;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.security.sasl.SaslException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
@@ -352,13 +354,13 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
     }
   }
 
-  private boolean setupSaslConnection(final InputStream in2, final 
OutputStream out2)
-    throws IOException {
+  private boolean setupSaslConnection(final InputStream in2, final 
OutputStream out2,
+    String serverPrincipal) throws IOException {
     if (this.metrics != null) {
       this.metrics.incrNsLookups();
     }
     saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, 
token,
-      socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed,
+      socket.getInetAddress(), serverPrincipal, this.rpcClient.fallbackAllowed,
       this.rpcClient.conf.get("hbase.rpc.protection",
         QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
       this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, 
CRYPTO_AES_ENABLED_DEFAULT));
@@ -379,7 +381,8 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
    * </p>
    */
   private void handleSaslConnectionFailure(final int currRetries, final int 
maxRetries,
-    final Exception ex, final UserGroupInformation user) throws IOException, 
InterruptedException {
+    final Exception ex, final UserGroupInformation user, final String 
serverPrincipal)
+    throws IOException, InterruptedException {
     closeSocket();
     user.doAs(new PrivilegedExceptionAction<Object>() {
       @Override
@@ -419,17 +422,22 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
           Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) 
+ 1);
           return null;
         } else {
-          String msg =
-            "Failed to initiate connection for " + 
UserGroupInformation.getLoginUser().getUserName()
-              + " to " + securityInfo.getServerPrincipal();
+          String msg = "Failed to initiate connection for "
+            + UserGroupInformation.getLoginUser().getUserName() + " to " + 
serverPrincipal;
           throw new IOException(msg, ex);
         }
       }
     });
   }
 
-  private void getConnectionRegistry(OutputStream outStream) throws 
IOException {
+  private void getConnectionRegistry(InputStream inStream, OutputStream 
outStream,
+    Call connectionRegistryCall) throws IOException {
     outStream.write(RpcClient.REGISTRY_PREAMBLE_HEADER);
+    readResponse(new DataInputStream(inStream), calls, connectionRegistryCall, 
remoteExc -> {
+      synchronized (this) {
+        closeConn(remoteExc);
+      }
+    });
   }
 
   private void createStreams(InputStream inStream, OutputStream outStream) {
@@ -437,7 +445,52 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
     this.out = new DataOutputStream(new BufferedOutputStream(outStream));
   }
 
-  private void setupIOstreams() throws IOException {
+  // choose the server principal to use
+  private String chooseServerPrincipal(InputStream inStream, OutputStream 
outStream)
+    throws IOException {
+    Set<String> serverPrincipals = getServerPrincipals();
+    if (serverPrincipals.size() == 1) {
+      return serverPrincipals.iterator().next();
+    }
+    // this means we use kerberos authentication and there are multiple server 
principal candidates,
+    // in this way we need to send a special preamble header to get server 
principal from server
+    Call securityPreambleCall = createSecurityPreambleCall(r -> {
+    });
+    outStream.write(RpcClient.SECURITY_PREAMBLE_HEADER);
+    readResponse(new DataInputStream(inStream), calls, securityPreambleCall, 
remoteExc -> {
+      synchronized (this) {
+        closeConn(remoteExc);
+      }
+    });
+    if (securityPreambleCall.error != null) {
+      LOG.debug("Error when trying to do a security preamble call to {}", 
remoteId.address,
+        securityPreambleCall.error);
+      if 
(ConnectionUtils.isUnexpectedPreambleHeaderException(securityPreambleCall.error))
 {
+        // this means we are connecting to an old server which does not 
support the security
+        // preamble call, so we should fallback to randomly select a principal 
to use
+        // TODO: find a way to reconnect without failing all the pending 
calls, for now, when we
+        // reach here, shutdown should have already been scheduled
+        throw securityPreambleCall.error;
+      }
+      if (IPCUtil.isSecurityNotEnabledException(securityPreambleCall.error)) {
+        // server tells us security is not enabled, then we should check 
whether fallback to
+        // simple is allowed, if so we just go without security, otherwise we 
should fail the
+        // negotiation immediately
+        if (rpcClient.fallbackAllowed) {
+          // TODO: just change the preamble and skip the fallback to simple 
logic, for now, just
+          // select the first principal can finish the connection setup, but 
waste one client
+          // message
+          return serverPrincipals.iterator().next();
+        } else {
+          throw new FallbackDisallowedException();
+        }
+      }
+      return randomSelect(serverPrincipals);
+    }
+    return chooseServerPrincipal(serverPrincipals, securityPreambleCall);
+  }
+
+  private void setupIOstreams(Call connectionRegistryCall) throws IOException {
     if (socket != null) {
       // The connection is already available. Perfect.
       return;
@@ -465,32 +518,37 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
         // This creates a socket with a write timeout. This timeout cannot be 
changed.
         OutputStream outStream = NetUtils.getOutputStream(socket, 
this.rpcClient.writeTO);
         if (connectionRegistryCall != null) {
-          getConnectionRegistry(outStream);
-          createStreams(inStream, outStream);
-          break;
+          getConnectionRegistry(inStream, outStream, connectionRegistryCall);
+          closeSocket();
+          return;
         }
-        // Write out the preamble -- MAGIC, version, and auth to use.
-        writeConnectionHeaderPreamble(outStream);
+
         if (useSasl) {
-          final InputStream in2 = inStream;
-          final OutputStream out2 = outStream;
           UserGroupInformation ticket = provider.getRealUser(remoteId.ticket);
           boolean continueSasl;
           if (ticket == null) {
             throw new FatalConnectionException("ticket/user is null");
           }
+          String serverPrincipal = chooseServerPrincipal(inStream, outStream);
+          // Write out the preamble -- MAGIC, version, and auth to use.
+          writeConnectionHeaderPreamble(outStream);
           try {
+            final InputStream in2 = inStream;
+            final OutputStream out2 = outStream;
             continueSasl = ticket.doAs(new 
PrivilegedExceptionAction<Boolean>() {
               @Override
               public Boolean run() throws IOException {
-                return setupSaslConnection(in2, out2);
+                return setupSaslConnection(in2, out2, serverPrincipal);
               }
             });
           } catch (Exception ex) {
             ExceptionUtil.rethrowIfInterrupt(ex);
-            handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, 
ticket);
+            saslNegotiationDone(serverPrincipal, false);
+            handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, 
ticket,
+              serverPrincipal);
             continue;
           }
+          saslNegotiationDone(serverPrincipal, true);
           if (continueSasl) {
             // Sasl connect is successful. Let's set up Sasl i/o streams.
             inStream = saslRpcClient.getInputStream();
@@ -501,6 +559,9 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
             // reconnecting because regionserver may change its sasl config 
after restart.
             saslRpcClient = null;
           }
+        } else {
+          // Write out the preamble -- MAGIC, version, and auth to use.
+          writeConnectionHeaderPreamble(outStream);
         }
         createStreams(inStream, outStream);
         // Now write out the connection header
@@ -618,9 +679,10 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
       }
       RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
       if (call.isConnectionRegistryCall()) {
-        connectionRegistryCall = call;
+        setupIOstreams(call);
+        return;
       }
-      setupIOstreams();
+      setupIOstreams(null);
 
       // Now we're going to write the call. We take the lock, then check that 
the connection
       // is still valid, and, if so we do the write to the socket. If the 
write fails, we don't
@@ -655,7 +717,7 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
    */
   private void readResponse() {
     try {
-      readResponse(in, calls, remoteExc -> {
+      readResponse(in, calls, null, remoteExc -> {
         synchronized (this) {
           closeConn(remoteExc);
         }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index d175ea0b6e9..980e708d235 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc;
 import io.opentelemetry.api.trace.Span;
 import java.io.IOException;
 import java.util.Map;
-import java.util.Optional;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.hadoop.hbase.CellScanner;
@@ -85,16 +84,15 @@ class Call {
    * Builds a simplified {@link #toString()} that includes just the id and 
method name.
    */
   public String toShortString() {
+    // Call[id=32153218,methodName=Get]
     return new ToStringBuilder(this, 
ToStringStyle.SHORT_PREFIX_STYLE).append("id", id)
-      .append("methodName", md.getName()).toString();
+      .append("methodName", md != null ? md.getName() : "").toString();
   }
 
   @Override
   public String toString() {
-    // Call[id=32153218,methodName=Get]
     return new ToStringBuilder(this, 
ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(toShortString())
-      .append("param", 
Optional.ofNullable(param).map(ProtobufUtil::getShortTextFormat).orElse(""))
-      .toString();
+      .append("param", param != null ? ProtobufUtil.getShortTextFormat(param) 
: "").toString();
   }
 
   /**
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index bf4b833e856..42094eb45e0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -178,6 +178,11 @@ class IPCUtil {
     }
   }
 
+  static boolean isSecurityNotEnabledException(IOException e) {
+    return e instanceof RemoteException
+      && SecurityNotEnabledException.class.getName().equals(((RemoteException) 
e).getClassName());
+  }
+
   static IOException toIOE(Throwable t) {
     if (t instanceof IOException) {
       return (IOException) t;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index a0f8f10d1cf..1618709fa9b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -26,10 +26,13 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import javax.security.sasl.SaslException;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
 import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
@@ -157,7 +160,7 @@ class NettyRpcConnection extends RpcConnection {
     });
   }
 
-  private void established(Channel ch) throws IOException {
+  private void established(Channel ch) {
     assert eventLoop.inEventLoop();
     ch.pipeline()
       .addBefore(BufferCallBeforeInitHandler.NAME, null,
@@ -169,6 +172,11 @@ class NettyRpcConnection extends RpcConnection {
       .fireUserEventTriggered(BufferCallEvent.success());
   }
 
+  private void saslEstablished(Channel ch, String serverPrincipal) {
+    saslNegotiationDone(serverPrincipal, true);
+    established(ch);
+  }
+
   private boolean reloginInProgress;
 
   private void scheduleRelogin(Throwable error) {
@@ -201,23 +209,31 @@ class NettyRpcConnection extends RpcConnection {
     // fail all pending calls
     ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
     shutdown0();
+    rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), e);
+  }
+
+  private void saslFailInit(Channel ch, String serverPrincipal, IOException 
error) {
+    assert eventLoop.inEventLoop();
+    saslNegotiationDone(serverPrincipal, false);
+    failInit(ch, error);
   }
 
-  private void saslNegotiate(final Channel ch) {
+  private void saslNegotiate(Channel ch, String serverPrincipal) {
     assert eventLoop.inEventLoop();
+    NettyFutureUtils.safeWriteAndFlush(ch, 
connectionHeaderPreamble.retainedDuplicate());
     UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket());
     if (ticket == null) {
-      failInit(ch, new FatalConnectionException("ticket/user is null"));
+      saslFailInit(ch, serverPrincipal, new 
FatalConnectionException("ticket/user is null"));
       return;
     }
     Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
     final NettyHBaseSaslRpcClientHandler saslHandler;
     try {
       saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, 
provider, token,
-        ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
+        ((InetSocketAddress) ch.remoteAddress()).getAddress(), serverPrincipal,
         rpcClient.fallbackAllowed, this.rpcClient.conf);
     } catch (IOException e) {
-      failInit(ch, e);
+      saslFailInit(ch, serverPrincipal, e);
       return;
     }
     ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new 
SaslChallengeDecoder())
@@ -252,35 +268,99 @@ class NettyRpcConnection extends RpcConnection {
                   p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
                   // don't send connection header, 
NettyHBaseRpcConnectionHeaderHandler
                   // sent it already
-                  established(ch);
+                  saslEstablished(ch, serverPrincipal);
                 } else {
                   final Throwable error = future.cause();
                   scheduleRelogin(error);
-                  failInit(ch, toIOE(error));
+                  saslFailInit(ch, serverPrincipal, toIOE(error));
                 }
               }
             });
           } else {
             // send the connection header to server
             ch.write(connectionHeaderWithLength.retainedDuplicate());
-            established(ch);
+            saslEstablished(ch, serverPrincipal);
           }
         } else {
           final Throwable error = future.cause();
           scheduleRelogin(error);
-          failInit(ch, toIOE(error));
+          saslFailInit(ch, serverPrincipal, toIOE(error));
         }
       }
     });
   }
 
-  private void getConnectionRegistry(Channel ch) throws IOException {
-    established(ch);
-    NettyFutureUtils.safeWriteAndFlush(ch,
-      Unpooled.directBuffer(6).writeBytes(RpcClient.REGISTRY_PREAMBLE_HEADER));
+  private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) 
throws IOException {
+    assert eventLoop.inEventLoop();
+    PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
+      RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall);
   }
 
-  private void connect() throws UnknownHostException {
+  private void onSecurityPreambleError(Channel ch, Set<String> 
serverPrincipals,
+    IOException error) {
+    assert eventLoop.inEventLoop();
+    LOG.debug("Error when trying to do a security preamble call to {}", 
remoteId.address, error);
+    if (ConnectionUtils.isUnexpectedPreambleHeaderException(error)) {
+      // this means we are connecting to an old server which does not support 
the security
+      // preamble call, so we should fallback to randomly select a principal 
to use
+      // TODO: find a way to reconnect without failing all the pending calls, 
for now, when we
+      // reach here, shutdown should have already been scheduled
+      return;
+    }
+    if (IPCUtil.isSecurityNotEnabledException(error)) {
+      // server tells us security is not enabled, then we should check whether 
fallback to
+      // simple is allowed, if so we just go without security, otherwise we 
should fail the
+      // negotiation immediately
+      if (rpcClient.fallbackAllowed) {
+        // TODO: just change the preamble and skip the fallback to simple 
logic, for now, just
+        // select the first principal can finish the connection setup, but 
waste one client
+        // message
+        saslNegotiate(ch, serverPrincipals.iterator().next());
+      } else {
+        failInit(ch, new FallbackDisallowedException());
+      }
+      return;
+    }
+    // usually we should not reach here, but for robust, just randomly select 
a principal to
+    // connect
+    saslNegotiate(ch, randomSelect(serverPrincipals));
+  }
+
+  private void onSecurityPreambleFinish(Channel ch, Set<String> 
serverPrincipals,
+    Call securityPreambleCall) {
+    assert eventLoop.inEventLoop();
+    String serverPrincipal;
+    try {
+      serverPrincipal = chooseServerPrincipal(serverPrincipals, 
securityPreambleCall);
+    } catch (SaslException e) {
+      failInit(ch, e);
+      return;
+    }
+    saslNegotiate(ch, serverPrincipal);
+  }
+
+  private void saslNegotiate(Channel ch) throws IOException {
+    assert eventLoop.inEventLoop();
+    Set<String> serverPrincipals = getServerPrincipals();
+    if (serverPrincipals.size() == 1) {
+      saslNegotiate(ch, serverPrincipals.iterator().next());
+      return;
+    }
+    // this means we use kerberos authentication and there are multiple server 
principal candidates,
+    // in this way we need to send a special preamble header to get server 
principal from server
+    Call securityPreambleCall = createSecurityPreambleCall(call -> {
+      assert eventLoop.inEventLoop();
+      if (call.error != null) {
+        onSecurityPreambleError(ch, serverPrincipals, call.error);
+      } else {
+        onSecurityPreambleFinish(ch, serverPrincipals, call);
+      }
+    });
+    PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this,
+      RpcClient.SECURITY_PREAMBLE_HEADER, securityPreambleCall);
+  }
+
+  private void connect(Call connectionRegistryCall) throws 
UnknownHostException {
     assert eventLoop.inEventLoop();
     LOG.trace("Connecting to {}", remoteId.getAddress());
     InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
@@ -310,16 +390,17 @@ class NettyRpcConnection extends RpcConnection {
 
         private void succeed(Channel ch) throws IOException {
           if (connectionRegistryCall != null) {
-            getConnectionRegistry(ch);
+            getConnectionRegistry(ch, connectionRegistryCall);
             return;
           }
-          NettyFutureUtils.safeWriteAndFlush(ch, 
connectionHeaderPreamble.retainedDuplicate());
-          if (useSasl) {
-            saslNegotiate(ch);
-          } else {
-            // send the connection header to server
+          if (!useSasl) {
+            // BufferCallBeforeInitHandler will call ctx.flush when receiving 
the
+            // BufferCallEvent.success() event, so here we just use write for 
the below two messages
+            NettyFutureUtils.safeWrite(ch, 
connectionHeaderPreamble.retainedDuplicate());
             NettyFutureUtils.safeWrite(ch, 
connectionHeaderWithLength.retainedDuplicate());
             established(ch);
+          } else {
+            saslNegotiate(ch);
           }
         }
 
@@ -331,7 +412,6 @@ class NettyRpcConnection extends RpcConnection {
             connectionRegistryCall.setException(ex);
           }
           failInit(ch, ex);
-          rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), 
error);
         }
 
         @Override
@@ -360,10 +440,9 @@ class NettyRpcConnection extends RpcConnection {
   private void sendRequest0(Call call, HBaseRpcController hrc) throws 
IOException {
     assert eventLoop.inEventLoop();
     if (call.isConnectionRegistryCall()) {
-      connectionRegistryCall = call;
       // For get connection registry call, we will send a special preamble 
header to get the
       // response, instead of sending a real rpc call. See HBASE-25051
-      connect();
+      connect(call);
       return;
     }
     if (reloginInProgress) {
@@ -386,7 +465,7 @@ class NettyRpcConnection extends RpcConnection {
           setCancelled(call);
         } else {
           if (channel == null) {
-            connect();
+            connect(null);
           }
           scheduleTimeoutTask(call);
           channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index 44772ae2dbf..ef3752bbf47 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -122,7 +122,7 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
 
   private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws 
IOException {
     try {
-      conn.readResponse(new ByteBufInputStream(buf), id2Call,
+      conn.readResponse(new ByteBufInputStream(buf), id2Call, null,
         remoteExc -> exceptionCaught(ctx, remoteExc));
     } catch (IOException e) {
       // In netty, the decoding the frame based, when reaching here we have 
already read a full
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleCallHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleCallHandler.java
new file mode 100644
index 00000000000..1b2a7cf6ace
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleCallHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
+import org.apache.hadoop.hbase.util.NettyFutureUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import 
org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
+
+/**
+ * Used to decode preamble calls.
+ */
+@InterfaceAudience.Private
+class PreambleCallHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+  private final NettyRpcConnection conn;
+
+  private final byte[] preambleHeader;
+
+  private final Call preambleCall;
+
+  PreambleCallHandler(NettyRpcConnection conn, byte[] preambleHeader, Call 
preambleCall) {
+    this.conn = conn;
+    this.preambleHeader = preambleHeader;
+    this.preambleCall = preambleCall;
+  }
+
+  @Override
+  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    NettyFutureUtils.safeWriteAndFlush(ctx,
+      Unpooled.directBuffer(preambleHeader.length).writeBytes(preambleHeader));
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws 
Exception {
+    try {
+      conn.readResponse(new ByteBufInputStream(buf), new HashMap<>(), 
preambleCall,
+        remoteExc -> exceptionCaught(ctx, remoteExc));
+    } finally {
+      ChannelPipeline p = ctx.pipeline();
+      p.remove("PreambleCallReadTimeoutHandler");
+      p.remove("PreambleCallFrameDecoder");
+      p.remove(this);
+    }
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    preambleCall.setException(new ConnectionClosedException("Connection 
closed"));
+    ctx.fireChannelInactive();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    preambleCall.setException(IPCUtil.toIOE(cause));
+  }
+
+  public static void setup(ChannelPipeline pipeline, int readTimeoutMs, 
NettyRpcConnection conn,
+    byte[] preambleHeader, Call preambleCall) {
+    // we do not use single decode here, as for a preamble call, we do not 
expect the server side
+    // will return multiple responses
+    pipeline
+      .addBefore(BufferCallBeforeInitHandler.NAME, 
"PreambleCallReadTimeoutHandler",
+        new ReadTimeoutHandler(readTimeoutMs, TimeUnit.MILLISECONDS))
+      .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallFrameDecoder",
+        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
+      .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallHandler",
+        new PreambleCallHandler(conn, preambleHeader, preambleCall));
+  }
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 369430e337a..7011dc5e139 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -56,6 +56,8 @@ public interface RpcClient extends Closeable {
 
   byte[] REGISTRY_PREAMBLE_HEADER = new byte[] { 'R', 'e', 'g', 'i', 's', 't' 
};
 
+  byte[] SECURITY_PREAMBLE_HEADER = new byte[] { 'S', 'e', 'c', 'u', 'r', 'i' 
};
+
   /**
    * Creates a "channel" that can be used by a blocking protobuf service. 
Useful setting up protobuf
    * blocking stubs.
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index 65f936d6fc3..8017e99ec4f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -21,17 +21,26 @@ import java.io.DataInput;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import javax.security.sasl.SaslException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.SecurityConstants;
 import org.apache.hadoop.hbase.security.SecurityInfo;
 import org.apache.hadoop.hbase.security.User;
 import 
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
@@ -40,6 +49,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -47,6 +57,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
@@ -58,6 +69,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
 
 /**
@@ -100,6 +112,12 @@ abstract class RpcConnection {
 
   protected SaslClientAuthenticationProvider provider;
 
+  // Record the server principal which we have successfully authenticated with 
the remote server
+  // this is used to save the extra round trip with server when there are 
multiple candidate server
+  // principals for a given rpc service, like ClientMetaService.
+  // See HBASE-28321 for more details.
+  private String lastSucceededServerPrincipal;
+
   protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, 
ConnectionId remoteId,
     String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec 
compressor,
     CellBlockBuilder cellBlockBuilder, MetricsConnection metrics,
@@ -221,6 +239,96 @@ abstract class RpcConnection {
     return remoteAddr;
   }
 
+  private static boolean useCanonicalHostname(Configuration conf) {
+    return !conf.getBoolean(
+      
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS,
+      
SecurityConstants.DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS);
+  }
+
+  private static String getHostnameForServerPrincipal(Configuration conf, 
InetAddress addr) {
+    final String hostname;
+    if (useCanonicalHostname(conf)) {
+      hostname = addr.getCanonicalHostName();
+      if (hostname.equals(addr.getHostAddress())) {
+        LOG.warn("Canonical hostname for SASL principal is the same with IP 
address: " + hostname
+          + ", " + addr.getHostName() + ". Check DNS configuration or consider 
"
+          + 
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS + 
"=true");
+      }
+    } else {
+      hostname = addr.getHostName();
+    }
+
+    return hostname.toLowerCase();
+  }
+
+  private static String getServerPrincipal(Configuration conf, String 
serverKey, InetAddress server)
+    throws IOException {
+    String hostname = getHostnameForServerPrincipal(conf, server);
+    return SecurityUtil.getServerPrincipal(conf.get(serverKey), hostname);
+  }
+
+  protected final boolean isKerberosAuth() {
+    return provider.getSaslAuthMethod().getCode() == AuthMethod.KERBEROS.code;
+  }
+
+  protected final Set<String> getServerPrincipals() throws IOException {
+    // for authentication method other than kerberos, we do not need to know 
the server principal
+    if (!isKerberosAuth()) {
+      return Collections.singleton(HConstants.EMPTY_STRING);
+    }
+    // if we have successfully authenticated last time, just return the server 
principal we use last
+    // time
+    if (lastSucceededServerPrincipal != null) {
+      return Collections.singleton(lastSucceededServerPrincipal);
+    }
+    InetAddress server =
+      new InetSocketAddress(remoteId.address.getHostName(), 
remoteId.address.getPort())
+        .getAddress();
+    // Even if we have multiple config key in security info, it is still 
possible that we configured
+    // the same principal for them, so here we use a Set
+    Set<String> serverPrincipals = new TreeSet<>();
+    for (String serverPrincipalKey : securityInfo.getServerPrincipals()) {
+      serverPrincipals.add(getServerPrincipal(conf, serverPrincipalKey, 
server));
+    }
+    return serverPrincipals;
+  }
+
+  protected final <T> T randomSelect(Collection<T> c) {
+    int select = ThreadLocalRandom.current().nextInt(c.size());
+    int index = 0;
+    for (T t : c) {
+      if (index == select) {
+        return t;
+      }
+      index++;
+    }
+    return null;
+  }
+
+  protected final String chooseServerPrincipal(Set<String> candidates, Call 
securityPreambleCall)
+    throws SaslException {
+    String principal =
+      ((SecurityPreamableResponse) 
securityPreambleCall.response).getServerPrincipal();
+    if (!candidates.contains(principal)) {
+      // this means the server returns principal which is not in our 
candidates, it could be a
+      // malicious server, stop connecting
+      throw new SaslException(remoteId.address + " tells us to use server 
principal " + principal
+        + " which is not expected, should be one of " + candidates);
+    }
+    return principal;
+  }
+
+  protected final void saslNegotiationDone(String serverPrincipal, boolean 
succeed) {
+    LOG.debug("sasl negotiation done with serverPrincipal = {}, succeed = {}", 
serverPrincipal,
+      succeed);
+    if (succeed) {
+      this.lastSucceededServerPrincipal = serverPrincipal;
+    } else {
+      // clear the recorded principal if authentication failed
+      this.lastSucceededServerPrincipal = null;
+    }
+  }
+
   protected abstract void callTimeout(Call call);
 
   public ConnectionId remoteId() {
@@ -252,7 +360,10 @@ abstract class RpcConnection {
    */
   public abstract void cleanupConnection();
 
-  protected Call connectionRegistryCall;
+  protected final Call createSecurityPreambleCall(RpcCallback<Call> callback) {
+    return new Call(-1, null, null, null, 
SecurityPreamableResponse.getDefaultInstance(), 0, 0,
+      Collections.emptyMap(), callback, MetricsConnection.newCallStats());
+  }
 
   private <T extends InputStream & DataInput> void finishCall(ResponseHeader 
responseHeader, T in,
     Call call) throws IOException {
@@ -286,7 +397,7 @@ abstract class RpcConnection {
   }
 
   <T extends InputStream & DataInput> void readResponse(T in, Map<Integer, 
Call> id2Call,
-    Consumer<RemoteException> fatalConnectionErrorConsumer) throws IOException 
{
+    Call preambleCall, Consumer<RemoteException> fatalConnectionErrorConsumer) 
throws IOException {
     int totalSize = in.readInt();
     ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
     int id = responseHeader.getCallId();
@@ -301,9 +412,8 @@ abstract class RpcConnection {
       if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
         // Here we will cleanup all calls so do not need to fall back, just 
return.
         fatalConnectionErrorConsumer.accept(remoteExc);
-        if (connectionRegistryCall != null) {
-          connectionRegistryCall.setException(remoteExc);
-          connectionRegistryCall = null;
+        if (preambleCall != null) {
+          preambleCall.setException(remoteExc);
         }
         return;
       }
@@ -311,10 +421,19 @@ abstract class RpcConnection {
       remoteExc = null;
     }
     if (id < 0) {
-      if (connectionRegistryCall != null) {
-        LOG.debug("process connection registry call");
-        finishCall(responseHeader, in, connectionRegistryCall);
-        connectionRegistryCall = null;
+      LOG.debug("process preamble call response with response type {}",
+        preambleCall != null
+          ? preambleCall.responseDefaultType.getDescriptorForType().getName()
+          : "null");
+      if (preambleCall == null) {
+        // fall through so later we will skip this response
+        LOG.warn("Got a negative call id {} but there is no preamble call", 
id);
+      } else {
+        if (remoteExc != null) {
+          preambleCall.setException(remoteExc);
+        } else {
+          finishCall(responseHeader, in, preambleCall);
+        }
         return;
       }
     }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/SecurityNotEnabledException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/SecurityNotEnabledException.java
new file mode 100644
index 00000000000..207188de8c6
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/SecurityNotEnabledException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Will be thrown when server received a security preamble call for asking the 
server principal but
+ * security is not enabled for this server.
+ * <p>
+ * This exception will not be thrown to upper layer so mark it as IA.Private.
+ */
+@InterfaceAudience.Private
+public class SecurityNotEnabledException extends HBaseIOException {
+
+  private static final long serialVersionUID = -3682812966232247662L;
+
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java
index 87b2287a601..4e6f2eab478 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java
@@ -45,38 +45,38 @@ public abstract class AbstractHBaseSaslRpcClient {
 
   /**
    * Create a HBaseSaslRpcClient for an authentication method
-   * @param conf            the configuration object
-   * @param provider        the authentication provider
-   * @param token           token to use if needed by the authentication method
-   * @param serverAddr      the address of the hbase service
-   * @param securityInfo    the security details for the remote hbase service
-   * @param fallbackAllowed does the client allow fallback to simple 
authentication
+   * @param conf             the configuration object
+   * @param provider         the authentication provider
+   * @param token            token to use if needed by the authentication 
method
+   * @param serverAddr       the address of the hbase service
+   * @param servicePrincipal the service principal to use if needed by the 
authentication method
+   * @param fallbackAllowed  does the client allow fallback to simple 
authentication
    */
   protected AbstractHBaseSaslRpcClient(Configuration conf,
     SaslClientAuthenticationProvider provider, Token<? extends 
TokenIdentifier> token,
-    InetAddress serverAddr, SecurityInfo securityInfo, boolean 
fallbackAllowed) throws IOException {
-    this(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, 
"authentication");
+    InetAddress serverAddr, String servicePrincipal, boolean fallbackAllowed) 
throws IOException {
+    this(conf, provider, token, serverAddr, servicePrincipal, fallbackAllowed, 
"authentication");
   }
 
   /**
    * Create a HBaseSaslRpcClient for an authentication method
-   * @param conf            configuration object
-   * @param provider        the authentication provider
-   * @param token           token to use if needed by the authentication method
-   * @param serverAddr      the address of the hbase service
-   * @param securityInfo    the security details for the remote hbase service
-   * @param fallbackAllowed does the client allow fallback to simple 
authentication
-   * @param rpcProtection   the protection level ("authentication", 
"integrity" or "privacy")
+   * @param conf             configuration object
+   * @param provider         the authentication provider
+   * @param token            token to use if needed by the authentication 
method
+   * @param serverAddr       the address of the hbase service
+   * @param servicePrincipal the service principal to use if needed by the 
authentication method
+   * @param fallbackAllowed  does the client allow fallback to simple 
authentication
+   * @param rpcProtection    the protection level ("authentication", 
"integrity" or "privacy")
    */
   protected AbstractHBaseSaslRpcClient(Configuration conf,
     SaslClientAuthenticationProvider provider, Token<? extends 
TokenIdentifier> token,
-    InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed,
-    String rpcProtection) throws IOException {
+    InetAddress serverAddr, String servicePrincipal, boolean fallbackAllowed, 
String rpcProtection)
+    throws IOException {
     this.fallbackAllowed = fallbackAllowed;
     saslProps = SaslUtil.initSaslProperties(rpcProtection);
 
     saslClient =
-      provider.createClient(conf, serverAddr, securityInfo, token, 
fallbackAllowed, saslProps);
+      provider.createClient(conf, serverAddr, servicePrincipal, token, 
fallbackAllowed, saslProps);
     if (saslClient == null) {
       throw new IOException(
         "Authentication provider " + provider.getClass() + " returned a null 
SaslClient");
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
index ace1c38ab22..ebf0a7f875f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
@@ -63,15 +63,15 @@ public class HBaseSaslRpcClient extends 
AbstractHBaseSaslRpcClient {
   private boolean initStreamForCrypto;
 
   public HBaseSaslRpcClient(Configuration conf, 
SaslClientAuthenticationProvider provider,
-    Token<? extends TokenIdentifier> token, InetAddress serverAddr, 
SecurityInfo securityInfo,
+    Token<? extends TokenIdentifier> token, InetAddress serverAddr, String 
servicePrincipal,
     boolean fallbackAllowed) throws IOException {
-    super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed);
+    super(conf, provider, token, serverAddr, servicePrincipal, 
fallbackAllowed);
   }
 
   public HBaseSaslRpcClient(Configuration conf, 
SaslClientAuthenticationProvider provider,
-    Token<? extends TokenIdentifier> token, InetAddress serverAddr, 
SecurityInfo securityInfo,
+    Token<? extends TokenIdentifier> token, InetAddress serverAddr, String 
servicePrincipal,
     boolean fallbackAllowed, String rpcProtection, boolean 
initStreamForCrypto) throws IOException {
-    super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, 
rpcProtection);
+    super(conf, provider, token, serverAddr, servicePrincipal, 
fallbackAllowed, rpcProtection);
     this.initStreamForCrypto = initStreamForCrypto;
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
index fe5481a10b2..47d380d7104 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
@@ -40,9 +40,9 @@ public class NettyHBaseSaslRpcClient extends 
AbstractHBaseSaslRpcClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(NettyHBaseSaslRpcClient.class);
 
   public NettyHBaseSaslRpcClient(Configuration conf, 
SaslClientAuthenticationProvider provider,
-    Token<? extends TokenIdentifier> token, InetAddress serverAddr, 
SecurityInfo securityInfo,
+    Token<? extends TokenIdentifier> token, InetAddress serverAddr, String 
serverPrincipal,
     boolean fallbackAllowed, String rpcProtection) throws IOException {
-    super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, 
rpcProtection);
+    super(conf, provider, token, serverAddr, serverPrincipal, fallbackAllowed, 
rpcProtection);
   }
 
   public void setupSaslHandler(ChannelPipeline p, String addAfter) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
index cc71355d429..567b5675b71 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
@@ -68,14 +68,14 @@ public class NettyHBaseSaslRpcClientHandler extends 
SimpleChannelInboundHandler<
    */
   public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, 
UserGroupInformation ugi,
     SaslClientAuthenticationProvider provider, Token<? extends 
TokenIdentifier> token,
-    InetAddress serverAddr, SecurityInfo securityInfo, boolean 
fallbackAllowed, Configuration conf)
+    InetAddress serverAddr, String serverPrincipal, boolean fallbackAllowed, 
Configuration conf)
     throws IOException {
     this.saslPromise = saslPromise;
     this.ugi = ugi;
     this.conf = conf;
     this.provider = provider;
     this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, 
serverAddr,
-      securityInfo, fallbackAllowed, conf.get("hbase.rpc.protection",
+      serverPrincipal, fallbackAllowed, conf.get("hbase.rpc.protection",
         SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
index 2e16d564695..a33f49573de 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java
@@ -17,10 +17,14 @@
  */
 package org.apache.hadoop.hbase.security;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos;
@@ -51,7 +55,8 @@ public class SecurityInfo {
     infos.put(MasterProtos.HbckService.getDescriptor().getName(),
       new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
     infos.put(RegistryProtos.ClientMetaService.getDescriptor().getName(),
-      new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
+      new SecurityInfo(Kind.HBASE_AUTH_TOKEN, 
SecurityConstants.MASTER_KRB_PRINCIPAL,
+        SecurityConstants.REGIONSERVER_KRB_PRINCIPAL));
     
infos.put(BootstrapNodeProtos.BootstrapNodeService.getDescriptor().getName(),
       new SecurityInfo(SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, 
Kind.HBASE_AUTH_TOKEN));
     infos.put(LockServiceProtos.LockService.getDescriptor().getName(),
@@ -75,16 +80,33 @@ public class SecurityInfo {
     return infos.get(serviceName);
   }
 
-  private final String serverPrincipal;
+  private final List<String> serverPrincipals;
   private final Kind tokenKind;
 
   public SecurityInfo(String serverPrincipal, Kind tokenKind) {
-    this.serverPrincipal = serverPrincipal;
+    this(tokenKind, serverPrincipal);
+  }
+
+  public SecurityInfo(Kind tokenKind, String... serverPrincipal) {
+    Preconditions.checkArgument(serverPrincipal.length > 0);
     this.tokenKind = tokenKind;
+    this.serverPrincipals = Arrays.asList(serverPrincipal);
   }
 
+  /**
+   * Although this class is IA.Private, we leak this class in
+   * {@code SaslClientAuthenticationProvider}, so need to align with the 
deprecation cycle for that
+   * class.
+   * @deprecated Since 2.5.8 and 2.6.0, will be removed in 4.0.0. Use {@link 
#getServerPrincipals()}
+   *             instead.
+   */
+  @Deprecated
   public String getServerPrincipal() {
-    return serverPrincipal;
+    return serverPrincipals.get(0);
+  }
+
+  public List<String> getServerPrincipals() {
+    return serverPrincipals;
   }
 
   public Kind getTokenKind() {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java
index 480e724599b..65893c1a75c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java
@@ -31,7 +31,6 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -46,9 +45,9 @@ public class DigestSaslClientAuthenticationProvider extends 
DigestSaslAuthentica
   implements SaslClientAuthenticationProvider {
 
   @Override
-  public SaslClient createClient(Configuration conf, InetAddress serverAddr,
-    SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean 
fallbackAllowed,
-    Map<String, String> saslProps) throws IOException {
+  public SaslClient createClient(Configuration conf, InetAddress serverAddr, 
String serverPrincipal,
+    Token<? extends TokenIdentifier> token, boolean fallbackAllowed, 
Map<String, String> saslProps)
+    throws IOException {
     return Sasl.createSaslClient(new String[] { 
getSaslAuthMethod().getSaslMechanism() }, null,
       null, SaslUtil.SASL_DEFAULT_REALM, saslProps, new 
DigestSaslClientCallbackHandler(token));
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java
index 218fd13b60c..77e92b35bd8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java
@@ -24,10 +24,7 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityConstants;
-import org.apache.hadoop.hbase.security.SecurityInfo;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -43,46 +40,10 @@ public class GssSaslClientAuthenticationProvider extends 
GssSaslAuthenticationPr
   private static final Logger LOG =
     LoggerFactory.getLogger(GssSaslClientAuthenticationProvider.class);
 
-  private static boolean useCanonicalHostname(Configuration conf) {
-    return !conf.getBoolean(
-      
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS,
-      
SecurityConstants.DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS);
-  }
-
-  public static String getHostnameForServerPrincipal(Configuration conf, 
InetAddress addr) {
-    final String hostname;
-
-    if (useCanonicalHostname(conf)) {
-      hostname = addr.getCanonicalHostName();
-      if (hostname.equals(addr.getHostAddress())) {
-        LOG.warn("Canonical hostname for SASL principal is the same with IP 
address: " + hostname
-          + ", " + addr.getHostName() + ". Check DNS configuration or consider 
"
-          + 
SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS + 
"=true");
-      }
-    } else {
-      hostname = addr.getHostName();
-    }
-
-    return hostname.toLowerCase();
-  }
-
-  String getServerPrincipal(Configuration conf, SecurityInfo securityInfo, 
InetAddress server)
-    throws IOException {
-    String hostname = getHostnameForServerPrincipal(conf, server);
-
-    String serverKey = securityInfo.getServerPrincipal();
-    if (serverKey == null) {
-      throw new IllegalArgumentException(
-        "Can't obtain server Kerberos config key from SecurityInfo");
-    }
-    return SecurityUtil.getServerPrincipal(conf.get(serverKey), hostname);
-  }
-
   @Override
-  public SaslClient createClient(Configuration conf, InetAddress serverAddr,
-    SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean 
fallbackAllowed,
-    Map<String, String> saslProps) throws IOException {
-    String serverPrincipal = getServerPrincipal(conf, securityInfo, 
serverAddr);
+  public SaslClient createClient(Configuration conf, InetAddress serverAddr, 
String serverPrincipal,
+    Token<? extends TokenIdentifier> token, boolean fallbackAllowed, 
Map<String, String> saslProps)
+    throws IOException {
     LOG.debug("Setting up Kerberos RPC to server={}", serverPrincipal);
     String[] names = SaslUtil.splitKerberosName(serverPrincipal);
     if (names.length != 3) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java
index bbc5ddac91a..4e23247ca76 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
 
 /**
@@ -45,11 +46,33 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformati
 public interface SaslClientAuthenticationProvider extends 
SaslAuthenticationProvider {
 
   /**
-   * Creates the SASL client instance for this auth'n method.
+   * Creates the SASL client instance for this authentication method.
+   * @deprecated Since 2.5.8 and 2.6.0. In our own code will not call this 
method any more,
+   *             customized authentication method should implement
+   *             {@link #createClient(Configuration, InetAddress, String, 
Token, boolean, Map)}
+   *             instead. Will be removed in 4.0.0.
    */
-  SaslClient createClient(Configuration conf, InetAddress serverAddr, 
SecurityInfo securityInfo,
-    Token<? extends TokenIdentifier> token, boolean fallbackAllowed, 
Map<String, String> saslProps)
-    throws IOException;
+  @Deprecated
+  default SaslClient createClient(Configuration conf, InetAddress serverAddr,
+    SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean 
fallbackAllowed,
+    Map<String, String> saslProps) throws IOException {
+    throw new UnsupportedOperationException("should not be used any more");
+  }
+
+  /**
+   * Create the SASL client instance for this authentication method.
+   * <p>
+   * The default implementation is create a fake {@link SecurityInfo} and call 
the above method, for
+   * keeping compatible with old customized authentication method
+   */
+  default SaslClient createClient(Configuration conf, InetAddress serverAddr,
+    String serverPrincipal, Token<? extends TokenIdentifier> token, boolean 
fallbackAllowed,
+    Map<String, String> saslProps) throws IOException {
+    String principalKey = "hbase.fake.kerberos.principal";
+    conf.set(principalKey, serverPrincipal);
+    return createClient(conf, serverAddr, new SecurityInfo(principalKey, 
Kind.HBASE_AUTH_TOKEN),
+      token, fallbackAllowed, saslProps);
+  }
 
   /**
    * Constructs a {@link UserInformation} from the given {@link 
UserGroupInformation}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java
index 6fff703689c..70e469003c8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java
@@ -22,7 +22,6 @@ import java.net.InetAddress;
 import java.util.Map;
 import javax.security.sasl.SaslClient;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.security.SecurityInfo;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -37,7 +36,7 @@ public class SimpleSaslClientAuthenticationProvider extends 
SimpleSaslAuthentica
 
   @Override
   public SaslClient createClient(Configuration conf, InetAddress serverAddress,
-    SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean 
fallbackAllowed,
+    String serverPrincipal, Token<? extends TokenIdentifier> token, boolean 
fallbackAllowed,
     Map<String, String> saslProps) throws IOException {
     return null;
   }
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
index 7b42ba224fa..6b1e7c33832 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
@@ -53,10 +53,8 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.junit.ClassRule;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,9 +78,6 @@ public class TestHBaseSaslRpcClient {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHBaseSaslRpcClient.class);
 
-  @Rule
-  public ExpectedException exception = ExpectedException.none();
-
   @Test
   public void testSaslClientUsesGivenRpcProtection() throws Exception {
     Token<? extends TokenIdentifier> token =
@@ -90,8 +85,7 @@ public class TestHBaseSaslRpcClient {
     DigestSaslClientAuthenticationProvider provider = new 
DigestSaslClientAuthenticationProvider();
     for (SaslUtil.QualityOfProtection qop : 
SaslUtil.QualityOfProtection.values()) {
       String negotiatedQop = new 
HBaseSaslRpcClient(HBaseConfiguration.create(), provider, token,
-        Mockito.mock(InetAddress.class), Mockito.mock(SecurityInfo.class), 
false, qop.name(),
-        false) {
+        Mockito.mock(InetAddress.class), "", false, qop.name(), false) {
         public String getQop() {
           return saslProps.get(Sasl.QOP);
         }
@@ -192,14 +186,14 @@ public class TestHBaseSaslRpcClient {
     DigestSaslClientAuthenticationProvider provider = new 
DigestSaslClientAuthenticationProvider() {
       @Override
       public SaslClient createClient(Configuration conf, InetAddress 
serverAddress,
-        SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, 
boolean fallbackAllowed,
+        String serverPrincipal, Token<? extends TokenIdentifier> token, 
boolean fallbackAllowed,
         Map<String, String> saslProps) {
         return Mockito.mock(SaslClient.class);
       }
     };
     HBaseSaslRpcClient rpcClient = new 
HBaseSaslRpcClient(HBaseConfiguration.create(), provider,
-      createTokenMockWithCredentials(principal, password), 
Mockito.mock(InetAddress.class),
-      Mockito.mock(SecurityInfo.class), false);
+      createTokenMockWithCredentials(principal, password), 
Mockito.mock(InetAddress.class), "",
+      false);
 
     try {
       rpcClient.getInputStream();
@@ -224,14 +218,14 @@ public class TestHBaseSaslRpcClient {
         new DigestSaslClientAuthenticationProvider() {
           @Override
           public SaslClient createClient(Configuration conf, InetAddress 
serverAddress,
-            SecurityInfo securityInfo, Token<? extends TokenIdentifier> token,
-            boolean fallbackAllowed, Map<String, String> saslProps) {
+            String serverPrincipal, Token<? extends TokenIdentifier> token, 
boolean fallbackAllowed,
+            Map<String, String> saslProps) {
             return null;
           }
         };
       new HBaseSaslRpcClient(HBaseConfiguration.create(), provider,
-        createTokenMockWithCredentials(principal, password), 
Mockito.mock(InetAddress.class),
-        Mockito.mock(SecurityInfo.class), false);
+        createTokenMockWithCredentials(principal, password), 
Mockito.mock(InetAddress.class), "",
+        false);
       return false;
     } catch (IOException ex) {
       return true;
@@ -254,8 +248,8 @@ public class TestHBaseSaslRpcClient {
     try {
       rpcClient = new HBaseSaslRpcClient(HBaseConfiguration.create(),
         new DigestSaslClientAuthenticationProvider(),
-        createTokenMockWithCredentials(principal, password), 
Mockito.mock(InetAddress.class),
-        Mockito.mock(SecurityInfo.class), false);
+        createTokenMockWithCredentials(principal, password), 
Mockito.mock(InetAddress.class), "",
+        false);
     } catch (Exception ex) {
       LOG.error(ex.getMessage(), ex);
     }
@@ -275,7 +269,7 @@ public class TestHBaseSaslRpcClient {
   private HBaseSaslRpcClient createSaslRpcClientForKerberos() throws 
IOException {
     return new HBaseSaslRpcClient(HBaseConfiguration.create(),
       new GssSaslClientAuthenticationProvider(), createTokenMock(), 
Mockito.mock(InetAddress.class),
-      Mockito.mock(SecurityInfo.class), false);
+      "", false);
   }
 
   private Token<? extends TokenIdentifier> 
createTokenMockWithCredentials(String principal,
@@ -291,7 +285,7 @@ public class TestHBaseSaslRpcClient {
   private HBaseSaslRpcClient createSaslRpcClientSimple() throws IOException {
     return new HBaseSaslRpcClient(HBaseConfiguration.create(),
       new SimpleSaslClientAuthenticationProvider(), createTokenMock(),
-      Mockito.mock(InetAddress.class), Mockito.mock(SecurityInfo.class), 
false);
+      Mockito.mock(InetAddress.class), "", false);
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java
index d0930a0f314..3b83d7dda63 100644
--- 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java
@@ -31,7 +31,6 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
 import org.apache.hadoop.hbase.security.User;
 import 
org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -46,9 +45,9 @@ public class ShadeSaslClientAuthenticationProvider extends 
ShadeSaslAuthenticati
   implements SaslClientAuthenticationProvider {
 
   @Override
-  public SaslClient createClient(Configuration conf, InetAddress serverAddr,
-    SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean 
fallbackAllowed,
-    Map<String, String> saslProps) throws IOException {
+  public SaslClient createClient(Configuration conf, InetAddress serverAddr, 
String serverPrincipal,
+    Token<? extends TokenIdentifier> token, boolean fallbackAllowed, 
Map<String, String> saslProps)
+    throws IOException {
     return Sasl.createSaslClient(new String[] { 
getSaslAuthMethod().getSaslMechanism() }, null,
       null, SaslUtil.SASL_DEFAULT_REALM, saslProps, new 
ShadeSaslClientCallbackHandler(token));
   }
diff --git a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto 
b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto
index e992e681fbf..3e44f8e16fa 100644
--- a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto
@@ -159,3 +159,7 @@ message ResponseHeader {
   // If present, then an encoded data block follows.
   optional CellBlockMeta cell_block_meta = 3;
 }
+
+message SecurityPreamableResponse {
+  required string server_principal = 1;
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 0876a1fd55f..a84d132a013 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -67,6 +67,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.gson.Gson;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import 
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
@@ -117,6 +118,7 @@ public abstract class RpcServer implements 
RpcServerInterface, ConfigurationObse
     LoggerFactory.getLogger("SecurityLogger." + Server.class.getName());
   protected SecretManager<TokenIdentifier> secretManager;
   protected final Map<String, String> saslProps;
+  protected final String serverPrincipal;
 
   protected ServiceAuthorizationManager authManager;
 
@@ -211,7 +213,7 @@ public abstract class RpcServer implements 
RpcServerInterface, ConfigurationObse
 
   protected final RpcScheduler scheduler;
 
-  protected UserProvider userProvider;
+  protected final UserProvider userProvider;
 
   protected final ByteBuffAllocator bbAllocator;
 
@@ -300,8 +302,11 @@ public abstract class RpcServer implements 
RpcServerInterface, ConfigurationObse
     if (isSecurityEnabled) {
       saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection",
         QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
+      serverPrincipal = 
Preconditions.checkNotNull(userProvider.getCurrentUserName(),
+        "can not get current user name when security is enabled");
     } else {
       saslProps = Collections.emptyMap();
+      serverPrincipal = HConstants.EMPTY_STRING;
     }
 
     this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
index be97ad582c3..31f46f30c38 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
@@ -88,6 +88,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
@@ -695,6 +696,13 @@ abstract class ServerRpcConnection implements Closeable {
     doRespond(getErrorResponse(msg, e));
   }
 
+  private void doPreambleResponse(Message resp) throws IOException {
+    ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build();
+    ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, 
null);
+    BufferChain bufChain = new BufferChain(buf);
+    doRespond(() -> bufChain);
+  }
+
   private boolean doConnectionRegistryResponse() throws IOException {
     if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) {
       // should be in tests or some scenarios where we should not reach here
@@ -710,13 +718,22 @@ abstract class ServerRpcConnection implements Closeable {
     }
     GetConnectionRegistryResponse resp =
       
GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build();
-    ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build();
-    ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, 
null);
-    BufferChain bufChain = new BufferChain(buf);
-    doRespond(() -> bufChain);
+    doPreambleResponse(resp);
     return true;
   }
 
+  private void doSecurityPreambleResponse() throws IOException {
+    if (rpcServer.isSecurityEnabled) {
+      SecurityPreamableResponse resp = SecurityPreamableResponse.newBuilder()
+        .setServerPrincipal(rpcServer.serverPrincipal).build();
+      doPreambleResponse(resp);
+    } else {
+      // security is not enabled, do not need a principal when connecting, 
throw a special exception
+      // to let client know it should just use simple authentication
+      doRespond(getErrorResponse("security is not enabled", new 
SecurityNotEnabledException()));
+    }
+  }
+
   protected final void callCleanupIfNeeded() {
     if (callCleanup != null) {
       callCleanup.run();
@@ -738,6 +755,13 @@ abstract class ServerRpcConnection implements Closeable {
     ) {
       return PreambleResponse.CLOSE;
     }
+    if (
+      ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6,
+        RpcClient.SECURITY_PREAMBLE_HEADER, 0, 6)
+    ) {
+      doSecurityPreambleResponse();
+      return PreambleResponse.CONTINUE;
+    }
     if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, 
RPC_HEADER, 0, 4)) {
       doBadPreambleHandling(
         "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received 
HEADER="
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
index 9e90a7a3133..1b28c19b430 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java
@@ -145,7 +145,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection 
{
         return count;
       case CONTINUE:
         // wait for the next preamble header
-        preambleBuffer.reset();
+        preambleBuffer.clear();
         return count;
       case CLOSE:
         return -1;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMultipleServerPrincipalsIPC.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMultipleServerPrincipalsIPC.java
new file mode 100644
index 00000000000..237f1cb4025
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMultipleServerPrincipalsIPC.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.security.sasl.SaslException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
+import 
org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
+
+/**
+ * Tests for HBASE-28321, where we have multiple server principals candidates 
for a rpc service.
+ * <p>
+ * Put here just because we need to visit some package private classes under 
this package.
+ */
+@RunWith(Parameterized.class)
+@Category({ SecurityTests.class, MediumTests.class })
+public class TestMultipleServerPrincipalsIPC {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMultipleServerPrincipalsIPC.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+  private static final File KEYTAB_FILE =
+    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
+
+  private static MiniKdc KDC;
+  private static String HOST = "localhost";
+  private static String SERVER_PRINCIPAL;
+  private static String SERVER_PRINCIPAL2;
+  private static String CLIENT_PRINCIPAL;
+
+  @Parameter(0)
+  public Class<? extends RpcServer> rpcServerImpl;
+
+  @Parameter(1)
+  public Class<? extends RpcClient> rpcClientImpl;
+
+  private Configuration clientConf;
+  private Configuration serverConf;
+  private UserGroupInformation clientUGI;
+  private UserGroupInformation serverUGI;
+  private RpcServer rpcServer;
+  private RpcClient rpcClient;
+
+  @Parameters(name = "{index}: rpcServerImpl={0}, rpcClientImpl={1}")
+  public static List<Object[]> params() {
+    List<Object[]> params = new ArrayList<>();
+    List<Class<? extends RpcServer>> rpcServerImpls =
+      Arrays.asList(NettyRpcServer.class, SimpleRpcServer.class);
+    List<Class<? extends RpcClient>> rpcClientImpls =
+      Arrays.asList(NettyRpcClient.class, BlockingRpcClient.class);
+    for (Class<? extends RpcServer> rpcServerImpl : rpcServerImpls) {
+      for (Class<? extends RpcClient> rpcClientImpl : rpcClientImpls) {
+        params.add(new Object[] { rpcServerImpl, rpcClientImpl });
+      }
+    }
+    return params;
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
+    SERVER_PRINCIPAL = "server/" + HOST + "@" + KDC.getRealm();
+    SERVER_PRINCIPAL2 = "server2/" + HOST + "@" + KDC.getRealm();
+    CLIENT_PRINCIPAL = "client";
+    KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, 
SERVER_PRINCIPAL2);
+    setSecuredConfiguration(TEST_UTIL.getConfiguration());
+    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 
1);
+    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 
0);
+    TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 
10);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    if (KDC != null) {
+      KDC.stop();
+    }
+  }
+
+  private static void setSecuredConfiguration(Configuration conf) {
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+    conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos");
+    conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
+  }
+
+  private void loginAndStartRpcServer(String principal, int port) throws 
Exception {
+    UserGroupInformation.setConfiguration(serverConf);
+    serverUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal,
+      KEYTAB_FILE.getCanonicalPath());
+    rpcServer = serverUGI.doAs((PrivilegedExceptionAction<
+      RpcServer>) () -> RpcServerFactory.createRpcServer(null, 
getClass().getSimpleName(),
+        Lists.newArrayList(
+          new 
RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, 
null)),
+        new InetSocketAddress(HOST, port), serverConf, new 
FifoRpcScheduler(serverConf, 1)));
+    rpcServer.start();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    clientConf = new Configuration(TEST_UTIL.getConfiguration());
+    clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 
rpcClientImpl,
+      RpcClient.class);
+    String serverPrincipalConfigName = "hbase.test.multiple.principal.first";
+    String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second";
+    clientConf.set(serverPrincipalConfigName, SERVER_PRINCIPAL);
+    clientConf.set(serverPrincipalConfigName2, SERVER_PRINCIPAL2);
+    serverConf = new Configuration(TEST_UTIL.getConfiguration());
+    serverConf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, 
rpcServerImpl,
+      RpcServer.class);
+    SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, 
serverPrincipalConfigName2,
+      serverPrincipalConfigName);
+    SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), 
securityInfo);
+
+    UserGroupInformation.setConfiguration(clientConf);
+    clientUGI = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL,
+      KEYTAB_FILE.getCanonicalPath());
+    loginAndStartRpcServer(SERVER_PRINCIPAL, 0);
+    rpcClient = clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> 
RpcClientFactory
+      .createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString()));
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    Closeables.close(rpcClient, true);
+    rpcServer.stop();
+  }
+
+  private String echo(String msg) throws Exception {
+    return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> {
+      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
+        ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), 
-1), User.getCurrent(),
+        10000);
+      TestProtobufRpcProto.BlockingInterface stub = 
TestProtobufRpcProto.newBlockingStub(channel);
+      return stub.echo(null, 
TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build())
+        .getMessage();
+    });
+  }
+
+  @Test
+  public void testEcho() throws Exception {
+    String msg = "Hello World";
+    assertEquals(msg, echo(msg));
+  }
+
+  @Test
+  public void testMaliciousServer() throws Exception {
+    // reset the server principals so the principal returned by server does 
not match
+    SecurityInfo securityInfo =
+      SecurityInfo.getInfo(TestProtobufRpcProto.getDescriptor().getName());
+    for (int i = 0; i < securityInfo.getServerPrincipals().size(); i++) {
+      clientConf.set(securityInfo.getServerPrincipals().get(i),
+        "valid_server_" + i + "/" + HOST + "@" + KDC.getRealm());
+    }
+    UndeclaredThrowableException error =
+      assertThrows(UndeclaredThrowableException.class, () -> echo("whatever"));
+    assertThat(error.getCause(), instanceOf(ServiceException.class));
+    assertThat(error.getCause().getCause(), instanceOf(SaslException.class));
+  }
+
+  @Test
+  public void testRememberLastSucceededServerPrincipal() throws Exception {
+    // after this call we will remember the last succeeded server principal
+    assertEquals("a", echo("a"));
+    // shutdown the connection, but does not remove it from pool
+    RpcConnection conn =
+      Iterables.getOnlyElement(((AbstractRpcClient<?>) 
rpcClient).getConnections().values());
+    conn.shutdown();
+    // recreate rpc server with server principal2
+    int port = rpcServer.getListenerAddress().getPort();
+    rpcServer.stop();
+    serverUGI.logoutUserFromKeytab();
+    loginAndStartRpcServer(SERVER_PRINCIPAL2, port);
+    // this time we will still use the remembered server principal, so we will 
get a sasl exception
+    UndeclaredThrowableException error =
+      assertThrows(UndeclaredThrowableException.class, () -> echo("a"));
+    assertThat(error.getCause(), instanceOf(ServiceException.class));
+    // created by IPCUtil.wrap, to prepend the server address
+    assertThat(error.getCause().getCause(), instanceOf(IOException.class));
+    // wraped IPCUtil.toIOE
+    assertThat(error.getCause().getCause().getCause(), 
instanceOf(IOException.class));
+    Throwable cause = error.getCause().getCause().getCause().getCause();
+    // for netty rpc client, it is DecoderException, for blocking rpc client, 
it is already
+    // RemoteExcetion
+    assertThat(cause,
+      
either(instanceOf(DecoderException.class)).or(instanceOf(RemoteException.class)));
+    RemoteException rme;
+    if (!(cause instanceof RemoteException)) {
+      assertThat(cause.getCause(), instanceOf(RemoteException.class));
+      rme = (RemoteException) cause.getCause();
+    } else {
+      rme = (RemoteException) cause;
+    }
+    assertEquals(SaslException.class.getName(), rme.getClassName());
+    // the above failure will clear the remembered server principal, so this 
time we will get the
+    // correct one. We use retry here just because a failure of sasl 
negotiation will trigger a
+    // relogin and it may take some time, and for netty based implementation 
the relogin is async
+    TEST_UTIL.waitFor(10000, () -> {
+      try {
+        echo("a");
+      } catch (UndeclaredThrowableException e) {
+        Throwable t = e.getCause().getCause();
+        assertThat(t, instanceOf(IOException.class));
+        if (!(t instanceof FailedServerException)) {
+          // for netty rpc client
+          assertThat(e.getCause().getMessage(),
+            containsString(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS));
+        }
+        return false;
+      }
+      return true;
+    });
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
index bc791754a12..345514396d6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -125,8 +126,8 @@ public class TestRpcSkipInitialSaslHandshake {
   @Test
   public void test() throws Exception {
     SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
-    Mockito.when(securityInfoMock.getServerPrincipal())
-      .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
+    Mockito.when(securityInfoMock.getServerPrincipals())
+      .thenReturn(Collections.singletonList(HBaseKerberosUtils.KRB_PRINCIPAL));
     SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
 
     final AtomicReference<NettyServerRpcConnection> conn = new 
AtomicReference<>(null);
@@ -152,7 +153,6 @@ public class TestRpcSkipInitialSaslHandshake {
           .getMessage();
       assertTrue("test".equals(response));
       assertFalse(conn.get().useSasl);
-
     } finally {
       rpcServer.stop();
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java
index b5e46b5c7cf..a74477bf28c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.net.InetSocketAddress;
+import java.util.Collections;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
@@ -122,8 +123,8 @@ public class TestSecurityRpcSentBytesMetrics {
   @Test
   public void test() throws Exception {
     SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
-    Mockito.when(securityInfoMock.getServerPrincipal())
-      .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
+    Mockito.when(securityInfoMock.getServerPrincipals())
+      .thenReturn(Collections.singletonList(HBaseKerberosUtils.KRB_PRINCIPAL));
     SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
 
     NettyRpcServer rpcServer = new NettyRpcServer(null, 
getClass().getSimpleName(),
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
index 998896c9468..31e01a98ad6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java
@@ -104,7 +104,7 @@ public class AbstractTestSecureIPC {
     TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 
100);
   }
 
-  protected static void stopKDC() throws InterruptedException {
+  protected static void stopKDC() {
     if (KDC != null) {
       KDC.stop();
     }
@@ -192,8 +192,8 @@ public class AbstractTestSecureIPC {
       return new SaslClientAuthenticationProvider() {
         @Override
         public SaslClient createClient(Configuration conf, InetAddress 
serverAddr,
-          SecurityInfo securityInfo, Token<? extends TokenIdentifier> token,
-          boolean fallbackAllowed, Map<String, String> saslProps) throws 
IOException {
+          String serverPrincipal, Token<? extends TokenIdentifier> token, 
boolean fallbackAllowed,
+          Map<String, String> saslProps) throws IOException {
           final String s = conf.get(CANONICAL_HOST_NAME_KEY);
           if (s != null) {
             try {
@@ -206,7 +206,7 @@ public class AbstractTestSecureIPC {
             }
           }
 
-          return delegate.createClient(conf, serverAddr, securityInfo, token, 
fallbackAllowed,
+          return delegate.createClient(conf, serverAddr, serverPrincipal, 
token, fallbackAllowed,
             saslProps);
         }
 
@@ -385,8 +385,8 @@ public class AbstractTestSecureIPC {
    */
   private void callRpcService(User serverUser, User clientUser) throws 
Exception {
     SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
-    Mockito.when(securityInfoMock.getServerPrincipal())
-      .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
+    Mockito.when(securityInfoMock.getServerPrincipals())
+      .thenReturn(Collections.singletonList(HBaseKerberosUtils.KRB_PRINCIPAL));
     SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
 
     InetSocketAddress isa = new InetSocketAddress(HOST, 0);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMultipleServerPrincipalsFallbackToSimple.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMultipleServerPrincipalsFallbackToSimple.java
new file mode 100644
index 00000000000..6f1cc148204
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMultipleServerPrincipalsFallbackToSimple.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
+import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.NettyRpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcClientFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.RpcServerFactory;
+import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
+import 
org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
+
+/**
+ * Test secure client connecting to a non secure server, where we have 
multiple server principal
+ * candidates for a rpc service. See HBASE-28321.
+ */
+@RunWith(Parameterized.class)
+@Category({ SecurityTests.class, MediumTests.class })
+public class TestMultipleServerPrincipalsFallbackToSimple {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestMultipleServerPrincipalsFallbackToSimple.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+  private static final File KEYTAB_FILE =
+    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
+
+  private static MiniKdc KDC;
+  private static String HOST = "localhost";
+  private static String SERVER_PRINCIPAL;
+  private static String SERVER_PRINCIPAL2;
+  private static String CLIENT_PRINCIPAL;
+
+  @Parameter
+  public Class<? extends RpcClient> rpcClientImpl;
+
+  private Configuration clientConf;
+  private UserGroupInformation clientUGI;
+  private RpcServer rpcServer;
+  private RpcClient rpcClient;
+
+  @Parameters(name = "{index}: rpcClientImpl={0}")
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[] { NettyRpcClient.class },
+      new Object[] { BlockingRpcClient.class });
+  }
+
+  private static void setSecuredConfiguration(Configuration conf) {
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+    conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos");
+    conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
+    SERVER_PRINCIPAL = "server/" + HOST;
+    SERVER_PRINCIPAL2 = "server2/" + HOST;
+    CLIENT_PRINCIPAL = "client";
+    KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, 
SERVER_PRINCIPAL2);
+    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 
1);
+    TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 
0);
+    TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 
10);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    clientConf = new Configuration(TEST_UTIL.getConfiguration());
+    setSecuredConfiguration(clientConf);
+    clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, 
rpcClientImpl,
+      RpcClient.class);
+    String serverPrincipalConfigName = "hbase.test.multiple.principal.first";
+    String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second";
+    clientConf.set(serverPrincipalConfigName, "server/localhost@" + 
KDC.getRealm());
+    clientConf.set(serverPrincipalConfigName2, "server2/localhost@" + 
KDC.getRealm());
+    SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, 
serverPrincipalConfigName2,
+      serverPrincipalConfigName);
+    SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), 
securityInfo);
+
+    UserGroupInformation.setConfiguration(clientConf);
+    clientUGI = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL,
+      KEYTAB_FILE.getCanonicalPath());
+
+    rpcServer = RpcServerFactory.createRpcServer(null, 
getClass().getSimpleName(),
+      Lists.newArrayList(
+        new 
RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, 
null)),
+      new InetSocketAddress(HOST, 0), TEST_UTIL.getConfiguration(),
+      new FifoRpcScheduler(TEST_UTIL.getConfiguration(), 1));
+    rpcServer.start();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    Closeables.close(rpcClient, true);
+    rpcServer.stop();
+  }
+
+  private RpcClient createClient() throws Exception {
+    return clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> 
RpcClientFactory
+      .createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString()));
+  }
+
+  private String echo(String msg) throws Exception {
+    return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> {
+      BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
+        ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), 
-1), User.getCurrent(),
+        10000);
+      TestProtobufRpcProto.BlockingInterface stub = 
TestProtobufRpcProto.newBlockingStub(channel);
+      return stub.echo(null, 
TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build())
+        .getMessage();
+    });
+  }
+
+  @Test
+  public void testAllowFallbackToSimple() throws Exception {
+    
clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, 
true);
+    rpcClient = createClient();
+    assertEquals("allow", echo("allow"));
+  }
+
+  @Test
+  public void testDisallowFallbackToSimple() throws Exception {
+    
clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, 
false);
+    rpcClient = createClient();
+    UndeclaredThrowableException error =
+      assertThrows(UndeclaredThrowableException.class, () -> echo("disallow"));
+    Throwable cause = error.getCause().getCause().getCause();
+    assertThat(cause, instanceOf(FallbackDisallowedException.class));
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSaslTlsIPCRejectPlainText.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSaslTlsIPCRejectPlainText.java
index a6984fcdf3a..ea9b6948011 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSaslTlsIPCRejectPlainText.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSaslTlsIPCRejectPlainText.java
@@ -21,6 +21,7 @@ import static 
org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosP
 import static 
org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration;
 
 import java.io.File;
+import java.util.Collections;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
@@ -66,8 +67,8 @@ public class TestSaslTlsIPCRejectPlainText extends 
AbstractTestTlsRejectPlainTex
     UGI = loginKerberosPrincipal(KEYTAB_FILE.getCanonicalPath(), PRINCIPAL);
     setSecuredConfiguration(util.getConfiguration());
     SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
-    Mockito.when(securityInfoMock.getServerPrincipal())
-      .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
+    Mockito.when(securityInfoMock.getServerPrincipals())
+      .thenReturn(Collections.singletonList(HBaseKerberosUtils.KRB_PRINCIPAL));
     SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/CustomSaslAuthenticationProviderTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/CustomSaslAuthenticationProviderTestBase.java
index feba17364cc..66b65ba03f0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/CustomSaslAuthenticationProviderTestBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/CustomSaslAuthenticationProviderTestBase.java
@@ -75,7 +75,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerFactory;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
 import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.token.SecureTestCluster;
 import org.apache.hadoop.hbase.security.token.TokenProvider;
@@ -202,7 +201,7 @@ public abstract class 
CustomSaslAuthenticationProviderTestBase {
 
     @Override
     public SaslClient createClient(Configuration conf, InetAddress serverAddr,
-      SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, 
boolean fallbackAllowed,
+      String serverPrincipal, Token<? extends TokenIdentifier> token, boolean 
fallbackAllowed,
       Map<String, String> saslProps) throws IOException {
       return Sasl.createSaslClient(new String[] { MECHANISM }, null, null,
         SaslUtil.SASL_DEFAULT_REALM, saslProps, new 
InMemoryClientProviderCallbackHandler(token));

Reply via email to