This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push: new ee068ba8470 HBASE-28321 RpcConnectionRegistry is broken when security is enabled and we use different principal for master and region server (#5688) ee068ba8470 is described below commit ee068ba8470dc7303a18f753dd8a6db66e5ac44c Author: Duo Zhang <zhang...@apache.org> AuthorDate: Sun Feb 25 20:50:13 2024 +0800 HBASE-28321 RpcConnectionRegistry is broken when security is enabled and we use different principal for master and region server (#5688) Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org> (cherry picked from commit c4a02f7fcd1d74385b87bb761d25b118ce080119) --- .../hadoop/hbase/client/ConnectionUtils.java | 2 +- .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 7 + .../hadoop/hbase/ipc/BlockingRpcConnection.java | 104 ++++++-- .../java/org/apache/hadoop/hbase/ipc/Call.java | 8 +- .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java | 5 + .../hadoop/hbase/ipc/NettyRpcConnection.java | 127 ++++++++-- .../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 2 +- .../hadoop/hbase/ipc/PreambleCallHandler.java | 95 +++++++ .../org/apache/hadoop/hbase/ipc/RpcClient.java | 2 + .../org/apache/hadoop/hbase/ipc/RpcConnection.java | 137 +++++++++- .../hbase/ipc/SecurityNotEnabledException.java | 34 +++ .../hbase/security/AbstractHBaseSaslRpcClient.java | 36 +-- .../hadoop/hbase/security/HBaseSaslRpcClient.java | 8 +- .../hbase/security/NettyHBaseSaslRpcClient.java | 4 +- .../security/NettyHBaseSaslRpcClientHandler.java | 4 +- .../apache/hadoop/hbase/security/SecurityInfo.java | 30 ++- .../DigestSaslClientAuthenticationProvider.java | 7 +- .../GssSaslClientAuthenticationProvider.java | 45 +--- .../provider/SaslClientAuthenticationProvider.java | 31 ++- .../SimpleSaslClientAuthenticationProvider.java | 3 +- .../hbase/security/TestHBaseSaslRpcClient.java | 30 +-- .../ShadeSaslClientAuthenticationProvider.java | 7 +- .../src/main/protobuf/rpc/RPC.proto | 4 + .../org/apache/hadoop/hbase/ipc/RpcServer.java | 7 +- .../hadoop/hbase/ipc/ServerRpcConnection.java | 32 ++- .../hbase/ipc/SimpleServerRpcConnection.java | 2 +- .../hbase/ipc/TestMultipleServerPrincipalsIPC.java | 277 +++++++++++++++++++++ .../hbase/ipc/TestRpcSkipInitialSaslHandshake.java | 6 +- .../hbase/ipc/TestSecurityRpcSentBytesMetrics.java | 5 +- .../hbase/security/AbstractTestSecureIPC.java | 12 +- ...stMultipleServerPrincipalsFallbackToSimple.java | 189 ++++++++++++++ .../security/TestSaslTlsIPCRejectPlainText.java | 5 +- .../CustomSaslAuthenticationProviderTestBase.java | 3 +- 33 files changed, 1084 insertions(+), 186 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index d073fef929f..84acc6e4d39 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -665,7 +665,7 @@ public final class ConnectionUtils { } } - static boolean isUnexpectedPreambleHeaderException(IOException e) { + public static boolean isUnexpectedPreambleHeaderException(IOException e) { if (!(e instanceof RemoteException)) { return false; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 7972cc08acd..3742eb8118a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; +import com.google.errorprone.annotations.RestrictedApi; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; @@ -542,6 +543,12 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); } + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + PoolMap<ConnectionId, T> getConnections() { + return connections; + } + private static class AbstractRpcChannel { protected final Address addr; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index 3f1418aa984..4b3d2de466b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -37,12 +37,14 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayDeque; import java.util.Locale; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import javax.security.sasl.SaslException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; @@ -352,13 +354,13 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { } } - private boolean setupSaslConnection(final InputStream in2, final OutputStream out2) - throws IOException { + private boolean setupSaslConnection(final InputStream in2, final OutputStream out2, + String serverPrincipal) throws IOException { if (this.metrics != null) { this.metrics.incrNsLookups(); } saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, - socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed, + socket.getInetAddress(), serverPrincipal, this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); @@ -379,7 +381,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { * </p> */ private void handleSaslConnectionFailure(final int currRetries, final int maxRetries, - final Exception ex, final UserGroupInformation user) throws IOException, InterruptedException { + final Exception ex, final UserGroupInformation user, final String serverPrincipal) + throws IOException, InterruptedException { closeSocket(); user.doAs(new PrivilegedExceptionAction<Object>() { @Override @@ -419,17 +422,22 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1); return null; } else { - String msg = - "Failed to initiate connection for " + UserGroupInformation.getLoginUser().getUserName() - + " to " + securityInfo.getServerPrincipal(); + String msg = "Failed to initiate connection for " + + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal; throw new IOException(msg, ex); } } }); } - private void getConnectionRegistry(OutputStream outStream) throws IOException { + private void getConnectionRegistry(InputStream inStream, OutputStream outStream, + Call connectionRegistryCall) throws IOException { outStream.write(RpcClient.REGISTRY_PREAMBLE_HEADER); + readResponse(new DataInputStream(inStream), calls, connectionRegistryCall, remoteExc -> { + synchronized (this) { + closeConn(remoteExc); + } + }); } private void createStreams(InputStream inStream, OutputStream outStream) { @@ -437,7 +445,52 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { this.out = new DataOutputStream(new BufferedOutputStream(outStream)); } - private void setupIOstreams() throws IOException { + // choose the server principal to use + private String chooseServerPrincipal(InputStream inStream, OutputStream outStream) + throws IOException { + Set<String> serverPrincipals = getServerPrincipals(); + if (serverPrincipals.size() == 1) { + return serverPrincipals.iterator().next(); + } + // this means we use kerberos authentication and there are multiple server principal candidates, + // in this way we need to send a special preamble header to get server principal from server + Call securityPreambleCall = createSecurityPreambleCall(r -> { + }); + outStream.write(RpcClient.SECURITY_PREAMBLE_HEADER); + readResponse(new DataInputStream(inStream), calls, securityPreambleCall, remoteExc -> { + synchronized (this) { + closeConn(remoteExc); + } + }); + if (securityPreambleCall.error != null) { + LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address, + securityPreambleCall.error); + if (ConnectionUtils.isUnexpectedPreambleHeaderException(securityPreambleCall.error)) { + // this means we are connecting to an old server which does not support the security + // preamble call, so we should fallback to randomly select a principal to use + // TODO: find a way to reconnect without failing all the pending calls, for now, when we + // reach here, shutdown should have already been scheduled + throw securityPreambleCall.error; + } + if (IPCUtil.isSecurityNotEnabledException(securityPreambleCall.error)) { + // server tells us security is not enabled, then we should check whether fallback to + // simple is allowed, if so we just go without security, otherwise we should fail the + // negotiation immediately + if (rpcClient.fallbackAllowed) { + // TODO: just change the preamble and skip the fallback to simple logic, for now, just + // select the first principal can finish the connection setup, but waste one client + // message + return serverPrincipals.iterator().next(); + } else { + throw new FallbackDisallowedException(); + } + } + return randomSelect(serverPrincipals); + } + return chooseServerPrincipal(serverPrincipals, securityPreambleCall); + } + + private void setupIOstreams(Call connectionRegistryCall) throws IOException { if (socket != null) { // The connection is already available. Perfect. return; @@ -465,32 +518,37 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { // This creates a socket with a write timeout. This timeout cannot be changed. OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); if (connectionRegistryCall != null) { - getConnectionRegistry(outStream); - createStreams(inStream, outStream); - break; + getConnectionRegistry(inStream, outStream, connectionRegistryCall); + closeSocket(); + return; } - // Write out the preamble -- MAGIC, version, and auth to use. - writeConnectionHeaderPreamble(outStream); + if (useSasl) { - final InputStream in2 = inStream; - final OutputStream out2 = outStream; UserGroupInformation ticket = provider.getRealUser(remoteId.ticket); boolean continueSasl; if (ticket == null) { throw new FatalConnectionException("ticket/user is null"); } + String serverPrincipal = chooseServerPrincipal(inStream, outStream); + // Write out the preamble -- MAGIC, version, and auth to use. + writeConnectionHeaderPreamble(outStream); try { + final InputStream in2 = inStream; + final OutputStream out2 = outStream; continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { - return setupSaslConnection(in2, out2); + return setupSaslConnection(in2, out2, serverPrincipal); } }); } catch (Exception ex) { ExceptionUtil.rethrowIfInterrupt(ex); - handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket); + saslNegotiationDone(serverPrincipal, false); + handleSaslConnectionFailure(numRetries++, reloginMaxRetries, ex, ticket, + serverPrincipal); continue; } + saslNegotiationDone(serverPrincipal, true); if (continueSasl) { // Sasl connect is successful. Let's set up Sasl i/o streams. inStream = saslRpcClient.getInputStream(); @@ -501,6 +559,9 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { // reconnecting because regionserver may change its sasl config after restart. saslRpcClient = null; } + } else { + // Write out the preamble -- MAGIC, version, and auth to use. + writeConnectionHeaderPreamble(outStream); } createStreams(inStream, outStream); // Now write out the connection header @@ -618,9 +679,10 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { } RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); if (call.isConnectionRegistryCall()) { - connectionRegistryCall = call; + setupIOstreams(call); + return; } - setupIOstreams(); + setupIOstreams(null); // Now we're going to write the call. We take the lock, then check that the connection // is still valid, and, if so we do the write to the socket. If the write fails, we don't @@ -655,7 +717,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { */ private void readResponse() { try { - readResponse(in, calls, remoteExc -> { + readResponse(in, calls, null, remoteExc -> { synchronized (this) { closeConn(remoteExc); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index d175ea0b6e9..980e708d235 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc; import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.util.Map; -import java.util.Optional; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.hadoop.hbase.CellScanner; @@ -85,16 +84,15 @@ class Call { * Builds a simplified {@link #toString()} that includes just the id and method name. */ public String toShortString() { + // Call[id=32153218,methodName=Get] return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id) - .append("methodName", md.getName()).toString(); + .append("methodName", md != null ? md.getName() : "").toString(); } @Override public String toString() { - // Call[id=32153218,methodName=Get] return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).appendSuper(toShortString()) - .append("param", Optional.ofNullable(param).map(ProtobufUtil::getShortTextFormat).orElse("")) - .toString(); + .append("param", param != null ? ProtobufUtil.getShortTextFormat(param) : "").toString(); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index bf4b833e856..42094eb45e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -178,6 +178,11 @@ class IPCUtil { } } + static boolean isSecurityNotEnabledException(IOException e) { + return e instanceof RemoteException + && SecurityNotEnabledException.class.getName().equals(((RemoteException) e).getClassName()); + } + static IOException toIOE(Throwable t) { if (t instanceof IOException) { return (IOException) t; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index a0f8f10d1cf..1618709fa9b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -26,10 +26,13 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import javax.security.sasl.SaslException; +import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.io.crypto.tls.X509Util; import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; @@ -157,7 +160,7 @@ class NettyRpcConnection extends RpcConnection { }); } - private void established(Channel ch) throws IOException { + private void established(Channel ch) { assert eventLoop.inEventLoop(); ch.pipeline() .addBefore(BufferCallBeforeInitHandler.NAME, null, @@ -169,6 +172,11 @@ class NettyRpcConnection extends RpcConnection { .fireUserEventTriggered(BufferCallEvent.success()); } + private void saslEstablished(Channel ch, String serverPrincipal) { + saslNegotiationDone(serverPrincipal, true); + established(ch); + } + private boolean reloginInProgress; private void scheduleRelogin(Throwable error) { @@ -201,23 +209,31 @@ class NettyRpcConnection extends RpcConnection { // fail all pending calls ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e)); shutdown0(); + rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), e); + } + + private void saslFailInit(Channel ch, String serverPrincipal, IOException error) { + assert eventLoop.inEventLoop(); + saslNegotiationDone(serverPrincipal, false); + failInit(ch, error); } - private void saslNegotiate(final Channel ch) { + private void saslNegotiate(Channel ch, String serverPrincipal) { assert eventLoop.inEventLoop(); + NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate()); UserGroupInformation ticket = provider.getRealUser(remoteId.getTicket()); if (ticket == null) { - failInit(ch, new FatalConnectionException("ticket/user is null")); + saslFailInit(ch, serverPrincipal, new FatalConnectionException("ticket/user is null")); return; } Promise<Boolean> saslPromise = ch.eventLoop().newPromise(); final NettyHBaseSaslRpcClientHandler saslHandler; try { saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, - ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo, + ((InetSocketAddress) ch.remoteAddress()).getAddress(), serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf); } catch (IOException e) { - failInit(ch, e); + saslFailInit(ch, serverPrincipal, e); return; } ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder()) @@ -252,35 +268,99 @@ class NettyRpcConnection extends RpcConnection { p.remove(NettyHBaseRpcConnectionHeaderHandler.class); // don't send connection header, NettyHBaseRpcConnectionHeaderHandler // sent it already - established(ch); + saslEstablished(ch, serverPrincipal); } else { final Throwable error = future.cause(); scheduleRelogin(error); - failInit(ch, toIOE(error)); + saslFailInit(ch, serverPrincipal, toIOE(error)); } } }); } else { // send the connection header to server ch.write(connectionHeaderWithLength.retainedDuplicate()); - established(ch); + saslEstablished(ch, serverPrincipal); } } else { final Throwable error = future.cause(); scheduleRelogin(error); - failInit(ch, toIOE(error)); + saslFailInit(ch, serverPrincipal, toIOE(error)); } } }); } - private void getConnectionRegistry(Channel ch) throws IOException { - established(ch); - NettyFutureUtils.safeWriteAndFlush(ch, - Unpooled.directBuffer(6).writeBytes(RpcClient.REGISTRY_PREAMBLE_HEADER)); + private void getConnectionRegistry(Channel ch, Call connectionRegistryCall) throws IOException { + assert eventLoop.inEventLoop(); + PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this, + RpcClient.REGISTRY_PREAMBLE_HEADER, connectionRegistryCall); } - private void connect() throws UnknownHostException { + private void onSecurityPreambleError(Channel ch, Set<String> serverPrincipals, + IOException error) { + assert eventLoop.inEventLoop(); + LOG.debug("Error when trying to do a security preamble call to {}", remoteId.address, error); + if (ConnectionUtils.isUnexpectedPreambleHeaderException(error)) { + // this means we are connecting to an old server which does not support the security + // preamble call, so we should fallback to randomly select a principal to use + // TODO: find a way to reconnect without failing all the pending calls, for now, when we + // reach here, shutdown should have already been scheduled + return; + } + if (IPCUtil.isSecurityNotEnabledException(error)) { + // server tells us security is not enabled, then we should check whether fallback to + // simple is allowed, if so we just go without security, otherwise we should fail the + // negotiation immediately + if (rpcClient.fallbackAllowed) { + // TODO: just change the preamble and skip the fallback to simple logic, for now, just + // select the first principal can finish the connection setup, but waste one client + // message + saslNegotiate(ch, serverPrincipals.iterator().next()); + } else { + failInit(ch, new FallbackDisallowedException()); + } + return; + } + // usually we should not reach here, but for robust, just randomly select a principal to + // connect + saslNegotiate(ch, randomSelect(serverPrincipals)); + } + + private void onSecurityPreambleFinish(Channel ch, Set<String> serverPrincipals, + Call securityPreambleCall) { + assert eventLoop.inEventLoop(); + String serverPrincipal; + try { + serverPrincipal = chooseServerPrincipal(serverPrincipals, securityPreambleCall); + } catch (SaslException e) { + failInit(ch, e); + return; + } + saslNegotiate(ch, serverPrincipal); + } + + private void saslNegotiate(Channel ch) throws IOException { + assert eventLoop.inEventLoop(); + Set<String> serverPrincipals = getServerPrincipals(); + if (serverPrincipals.size() == 1) { + saslNegotiate(ch, serverPrincipals.iterator().next()); + return; + } + // this means we use kerberos authentication and there are multiple server principal candidates, + // in this way we need to send a special preamble header to get server principal from server + Call securityPreambleCall = createSecurityPreambleCall(call -> { + assert eventLoop.inEventLoop(); + if (call.error != null) { + onSecurityPreambleError(ch, serverPrincipals, call.error); + } else { + onSecurityPreambleFinish(ch, serverPrincipals, call); + } + }); + PreambleCallHandler.setup(ch.pipeline(), rpcClient.readTO, this, + RpcClient.SECURITY_PREAMBLE_HEADER, securityPreambleCall); + } + + private void connect(Call connectionRegistryCall) throws UnknownHostException { assert eventLoop.inEventLoop(); LOG.trace("Connecting to {}", remoteId.getAddress()); InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); @@ -310,16 +390,17 @@ class NettyRpcConnection extends RpcConnection { private void succeed(Channel ch) throws IOException { if (connectionRegistryCall != null) { - getConnectionRegistry(ch); + getConnectionRegistry(ch, connectionRegistryCall); return; } - NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate()); - if (useSasl) { - saslNegotiate(ch); - } else { - // send the connection header to server + if (!useSasl) { + // BufferCallBeforeInitHandler will call ctx.flush when receiving the + // BufferCallEvent.success() event, so here we just use write for the below two messages + NettyFutureUtils.safeWrite(ch, connectionHeaderPreamble.retainedDuplicate()); NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate()); established(ch); + } else { + saslNegotiate(ch); } } @@ -331,7 +412,6 @@ class NettyRpcConnection extends RpcConnection { connectionRegistryCall.setException(ex); } failInit(ch, ex); - rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), error); } @Override @@ -360,10 +440,9 @@ class NettyRpcConnection extends RpcConnection { private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException { assert eventLoop.inEventLoop(); if (call.isConnectionRegistryCall()) { - connectionRegistryCall = call; // For get connection registry call, we will send a special preamble header to get the // response, instead of sending a real rpc call. See HBASE-25051 - connect(); + connect(call); return; } if (reloginInProgress) { @@ -386,7 +465,7 @@ class NettyRpcConnection extends RpcConnection { setCancelled(call); } else { if (channel == null) { - connect(); + connect(null); } scheduleTimeoutTask(call); channel.writeAndFlush(call).addListener(new ChannelFutureListener() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index 44772ae2dbf..ef3752bbf47 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -122,7 +122,7 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler { private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException { try { - conn.readResponse(new ByteBufInputStream(buf), id2Call, + conn.readResponse(new ByteBufInputStream(buf), id2Call, null, remoteExc -> exceptionCaught(ctx, remoteExc)); } catch (IOException e) { // In netty, the decoding the frame based, when reaching here we have already read a full diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleCallHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleCallHandler.java new file mode 100644 index 00000000000..1b2a7cf6ace --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PreambleCallHandler.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.HashMap; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; +import org.apache.hadoop.hbase.util.NettyFutureUtils; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; +import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; +import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; + +/** + * Used to decode preamble calls. + */ +@InterfaceAudience.Private +class PreambleCallHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private final NettyRpcConnection conn; + + private final byte[] preambleHeader; + + private final Call preambleCall; + + PreambleCallHandler(NettyRpcConnection conn, byte[] preambleHeader, Call preambleCall) { + this.conn = conn; + this.preambleHeader = preambleHeader; + this.preambleCall = preambleCall; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + NettyFutureUtils.safeWriteAndFlush(ctx, + Unpooled.directBuffer(preambleHeader.length).writeBytes(preambleHeader)); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { + try { + conn.readResponse(new ByteBufInputStream(buf), new HashMap<>(), preambleCall, + remoteExc -> exceptionCaught(ctx, remoteExc)); + } finally { + ChannelPipeline p = ctx.pipeline(); + p.remove("PreambleCallReadTimeoutHandler"); + p.remove("PreambleCallFrameDecoder"); + p.remove(this); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + preambleCall.setException(new ConnectionClosedException("Connection closed")); + ctx.fireChannelInactive(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + preambleCall.setException(IPCUtil.toIOE(cause)); + } + + public static void setup(ChannelPipeline pipeline, int readTimeoutMs, NettyRpcConnection conn, + byte[] preambleHeader, Call preambleCall) { + // we do not use single decode here, as for a preamble call, we do not expect the server side + // will return multiple responses + pipeline + .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallReadTimeoutHandler", + new ReadTimeoutHandler(readTimeoutMs, TimeUnit.MILLISECONDS)) + .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallFrameDecoder", + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)) + .addBefore(BufferCallBeforeInitHandler.NAME, "PreambleCallHandler", + new PreambleCallHandler(conn, preambleHeader, preambleCall)); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 369430e337a..7011dc5e139 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -56,6 +56,8 @@ public interface RpcClient extends Closeable { byte[] REGISTRY_PREAMBLE_HEADER = new byte[] { 'R', 'e', 'g', 'i', 's', 't' }; + byte[] SECURITY_PREAMBLE_HEADER = new byte[] { 'S', 'e', 'c', 'u', 'r', 'i' }; + /** * Creates a "channel" that can be used by a blocking protobuf service. Useful setting up protobuf * blocking stubs. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index 65f936d6fc3..8017e99ec4f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -21,17 +21,26 @@ import java.io.DataInput; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Collection; +import java.util.Collections; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import javax.security.sasl.SaslException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.security.AuthMethod; +import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; @@ -40,6 +49,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.yetus.audience.InterfaceAudience; @@ -47,6 +57,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; @@ -58,6 +69,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; /** @@ -100,6 +112,12 @@ abstract class RpcConnection { protected SaslClientAuthenticationProvider provider; + // Record the server principal which we have successfully authenticated with the remote server + // this is used to save the extra round trip with server when there are multiple candidate server + // principals for a given rpc service, like ClientMetaService. + // See HBASE-28321 for more details. + private String lastSucceededServerPrincipal; + protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor, CellBlockBuilder cellBlockBuilder, MetricsConnection metrics, @@ -221,6 +239,96 @@ abstract class RpcConnection { return remoteAddr; } + private static boolean useCanonicalHostname(Configuration conf) { + return !conf.getBoolean( + SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, + SecurityConstants.DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS); + } + + private static String getHostnameForServerPrincipal(Configuration conf, InetAddress addr) { + final String hostname; + if (useCanonicalHostname(conf)) { + hostname = addr.getCanonicalHostName(); + if (hostname.equals(addr.getHostAddress())) { + LOG.warn("Canonical hostname for SASL principal is the same with IP address: " + hostname + + ", " + addr.getHostName() + ". Check DNS configuration or consider " + + SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS + "=true"); + } + } else { + hostname = addr.getHostName(); + } + + return hostname.toLowerCase(); + } + + private static String getServerPrincipal(Configuration conf, String serverKey, InetAddress server) + throws IOException { + String hostname = getHostnameForServerPrincipal(conf, server); + return SecurityUtil.getServerPrincipal(conf.get(serverKey), hostname); + } + + protected final boolean isKerberosAuth() { + return provider.getSaslAuthMethod().getCode() == AuthMethod.KERBEROS.code; + } + + protected final Set<String> getServerPrincipals() throws IOException { + // for authentication method other than kerberos, we do not need to know the server principal + if (!isKerberosAuth()) { + return Collections.singleton(HConstants.EMPTY_STRING); + } + // if we have successfully authenticated last time, just return the server principal we use last + // time + if (lastSucceededServerPrincipal != null) { + return Collections.singleton(lastSucceededServerPrincipal); + } + InetAddress server = + new InetSocketAddress(remoteId.address.getHostName(), remoteId.address.getPort()) + .getAddress(); + // Even if we have multiple config key in security info, it is still possible that we configured + // the same principal for them, so here we use a Set + Set<String> serverPrincipals = new TreeSet<>(); + for (String serverPrincipalKey : securityInfo.getServerPrincipals()) { + serverPrincipals.add(getServerPrincipal(conf, serverPrincipalKey, server)); + } + return serverPrincipals; + } + + protected final <T> T randomSelect(Collection<T> c) { + int select = ThreadLocalRandom.current().nextInt(c.size()); + int index = 0; + for (T t : c) { + if (index == select) { + return t; + } + index++; + } + return null; + } + + protected final String chooseServerPrincipal(Set<String> candidates, Call securityPreambleCall) + throws SaslException { + String principal = + ((SecurityPreamableResponse) securityPreambleCall.response).getServerPrincipal(); + if (!candidates.contains(principal)) { + // this means the server returns principal which is not in our candidates, it could be a + // malicious server, stop connecting + throw new SaslException(remoteId.address + " tells us to use server principal " + principal + + " which is not expected, should be one of " + candidates); + } + return principal; + } + + protected final void saslNegotiationDone(String serverPrincipal, boolean succeed) { + LOG.debug("sasl negotiation done with serverPrincipal = {}, succeed = {}", serverPrincipal, + succeed); + if (succeed) { + this.lastSucceededServerPrincipal = serverPrincipal; + } else { + // clear the recorded principal if authentication failed + this.lastSucceededServerPrincipal = null; + } + } + protected abstract void callTimeout(Call call); public ConnectionId remoteId() { @@ -252,7 +360,10 @@ abstract class RpcConnection { */ public abstract void cleanupConnection(); - protected Call connectionRegistryCall; + protected final Call createSecurityPreambleCall(RpcCallback<Call> callback) { + return new Call(-1, null, null, null, SecurityPreamableResponse.getDefaultInstance(), 0, 0, + Collections.emptyMap(), callback, MetricsConnection.newCallStats()); + } private <T extends InputStream & DataInput> void finishCall(ResponseHeader responseHeader, T in, Call call) throws IOException { @@ -286,7 +397,7 @@ abstract class RpcConnection { } <T extends InputStream & DataInput> void readResponse(T in, Map<Integer, Call> id2Call, - Consumer<RemoteException> fatalConnectionErrorConsumer) throws IOException { + Call preambleCall, Consumer<RemoteException> fatalConnectionErrorConsumer) throws IOException { int totalSize = in.readInt(); ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); int id = responseHeader.getCallId(); @@ -301,9 +412,8 @@ abstract class RpcConnection { if (IPCUtil.isFatalConnectionException(exceptionResponse)) { // Here we will cleanup all calls so do not need to fall back, just return. fatalConnectionErrorConsumer.accept(remoteExc); - if (connectionRegistryCall != null) { - connectionRegistryCall.setException(remoteExc); - connectionRegistryCall = null; + if (preambleCall != null) { + preambleCall.setException(remoteExc); } return; } @@ -311,10 +421,19 @@ abstract class RpcConnection { remoteExc = null; } if (id < 0) { - if (connectionRegistryCall != null) { - LOG.debug("process connection registry call"); - finishCall(responseHeader, in, connectionRegistryCall); - connectionRegistryCall = null; + LOG.debug("process preamble call response with response type {}", + preambleCall != null + ? preambleCall.responseDefaultType.getDescriptorForType().getName() + : "null"); + if (preambleCall == null) { + // fall through so later we will skip this response + LOG.warn("Got a negative call id {} but there is no preamble call", id); + } else { + if (remoteExc != null) { + preambleCall.setException(remoteExc); + } else { + finishCall(responseHeader, in, preambleCall); + } return; } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/SecurityNotEnabledException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/SecurityNotEnabledException.java new file mode 100644 index 00000000000..207188de8c6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/SecurityNotEnabledException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Will be thrown when server received a security preamble call for asking the server principal but + * security is not enabled for this server. + * <p> + * This exception will not be thrown to upper layer so mark it as IA.Private. + */ +@InterfaceAudience.Private +public class SecurityNotEnabledException extends HBaseIOException { + + private static final long serialVersionUID = -3682812966232247662L; + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java index 87b2287a601..4e6f2eab478 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AbstractHBaseSaslRpcClient.java @@ -45,38 +45,38 @@ public abstract class AbstractHBaseSaslRpcClient { /** * Create a HBaseSaslRpcClient for an authentication method - * @param conf the configuration object - * @param provider the authentication provider - * @param token token to use if needed by the authentication method - * @param serverAddr the address of the hbase service - * @param securityInfo the security details for the remote hbase service - * @param fallbackAllowed does the client allow fallback to simple authentication + * @param conf the configuration object + * @param provider the authentication provider + * @param token token to use if needed by the authentication method + * @param serverAddr the address of the hbase service + * @param servicePrincipal the service principal to use if needed by the authentication method + * @param fallbackAllowed does the client allow fallback to simple authentication */ protected AbstractHBaseSaslRpcClient(Configuration conf, SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token, - InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed) throws IOException { - this(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, "authentication"); + InetAddress serverAddr, String servicePrincipal, boolean fallbackAllowed) throws IOException { + this(conf, provider, token, serverAddr, servicePrincipal, fallbackAllowed, "authentication"); } /** * Create a HBaseSaslRpcClient for an authentication method - * @param conf configuration object - * @param provider the authentication provider - * @param token token to use if needed by the authentication method - * @param serverAddr the address of the hbase service - * @param securityInfo the security details for the remote hbase service - * @param fallbackAllowed does the client allow fallback to simple authentication - * @param rpcProtection the protection level ("authentication", "integrity" or "privacy") + * @param conf configuration object + * @param provider the authentication provider + * @param token token to use if needed by the authentication method + * @param serverAddr the address of the hbase service + * @param servicePrincipal the service principal to use if needed by the authentication method + * @param fallbackAllowed does the client allow fallback to simple authentication + * @param rpcProtection the protection level ("authentication", "integrity" or "privacy") */ protected AbstractHBaseSaslRpcClient(Configuration conf, SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token, - InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed, - String rpcProtection) throws IOException { + InetAddress serverAddr, String servicePrincipal, boolean fallbackAllowed, String rpcProtection) + throws IOException { this.fallbackAllowed = fallbackAllowed; saslProps = SaslUtil.initSaslProperties(rpcProtection); saslClient = - provider.createClient(conf, serverAddr, securityInfo, token, fallbackAllowed, saslProps); + provider.createClient(conf, serverAddr, servicePrincipal, token, fallbackAllowed, saslProps); if (saslClient == null) { throw new IOException( "Authentication provider " + provider.getClass() + " returned a null SaslClient"); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java index ace1c38ab22..ebf0a7f875f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java @@ -63,15 +63,15 @@ public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { private boolean initStreamForCrypto; public HBaseSaslRpcClient(Configuration conf, SaslClientAuthenticationProvider provider, - Token<? extends TokenIdentifier> token, InetAddress serverAddr, SecurityInfo securityInfo, + Token<? extends TokenIdentifier> token, InetAddress serverAddr, String servicePrincipal, boolean fallbackAllowed) throws IOException { - super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed); + super(conf, provider, token, serverAddr, servicePrincipal, fallbackAllowed); } public HBaseSaslRpcClient(Configuration conf, SaslClientAuthenticationProvider provider, - Token<? extends TokenIdentifier> token, InetAddress serverAddr, SecurityInfo securityInfo, + Token<? extends TokenIdentifier> token, InetAddress serverAddr, String servicePrincipal, boolean fallbackAllowed, String rpcProtection, boolean initStreamForCrypto) throws IOException { - super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, rpcProtection); + super(conf, provider, token, serverAddr, servicePrincipal, fallbackAllowed, rpcProtection); this.initStreamForCrypto = initStreamForCrypto; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java index fe5481a10b2..47d380d7104 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java @@ -40,9 +40,9 @@ public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient { private static final Logger LOG = LoggerFactory.getLogger(NettyHBaseSaslRpcClient.class); public NettyHBaseSaslRpcClient(Configuration conf, SaslClientAuthenticationProvider provider, - Token<? extends TokenIdentifier> token, InetAddress serverAddr, SecurityInfo securityInfo, + Token<? extends TokenIdentifier> token, InetAddress serverAddr, String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException { - super(conf, provider, token, serverAddr, securityInfo, fallbackAllowed, rpcProtection); + super(conf, provider, token, serverAddr, serverPrincipal, fallbackAllowed, rpcProtection); } public void setupSaslHandler(ChannelPipeline p, String addAfter) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java index cc71355d429..567b5675b71 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java @@ -68,14 +68,14 @@ public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler< */ public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi, SaslClientAuthenticationProvider provider, Token<? extends TokenIdentifier> token, - InetAddress serverAddr, SecurityInfo securityInfo, boolean fallbackAllowed, Configuration conf) + InetAddress serverAddr, String serverPrincipal, boolean fallbackAllowed, Configuration conf) throws IOException { this.saslPromise = saslPromise; this.ugi = ugi; this.conf = conf; this.provider = provider; this.saslRpcClient = new NettyHBaseSaslRpcClient(conf, provider, token, serverAddr, - securityInfo, fallbackAllowed, conf.get("hbase.rpc.protection", + serverPrincipal, fallbackAllowed, conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase())); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java index 2e16d564695..a33f49573de 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hbase.security; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; import org.apache.hadoop.hbase.shaded.protobuf.generated.BootstrapNodeProtos; @@ -51,7 +55,8 @@ public class SecurityInfo { infos.put(MasterProtos.HbckService.getDescriptor().getName(), new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN)); infos.put(RegistryProtos.ClientMetaService.getDescriptor().getName(), - new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN)); + new SecurityInfo(Kind.HBASE_AUTH_TOKEN, SecurityConstants.MASTER_KRB_PRINCIPAL, + SecurityConstants.REGIONSERVER_KRB_PRINCIPAL)); infos.put(BootstrapNodeProtos.BootstrapNodeService.getDescriptor().getName(), new SecurityInfo(SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN)); infos.put(LockServiceProtos.LockService.getDescriptor().getName(), @@ -75,16 +80,33 @@ public class SecurityInfo { return infos.get(serviceName); } - private final String serverPrincipal; + private final List<String> serverPrincipals; private final Kind tokenKind; public SecurityInfo(String serverPrincipal, Kind tokenKind) { - this.serverPrincipal = serverPrincipal; + this(tokenKind, serverPrincipal); + } + + public SecurityInfo(Kind tokenKind, String... serverPrincipal) { + Preconditions.checkArgument(serverPrincipal.length > 0); this.tokenKind = tokenKind; + this.serverPrincipals = Arrays.asList(serverPrincipal); } + /** + * Although this class is IA.Private, we leak this class in + * {@code SaslClientAuthenticationProvider}, so need to align with the deprecation cycle for that + * class. + * @deprecated Since 2.5.8 and 2.6.0, will be removed in 4.0.0. Use {@link #getServerPrincipals()} + * instead. + */ + @Deprecated public String getServerPrincipal() { - return serverPrincipal; + return serverPrincipals.get(0); + } + + public List<String> getServerPrincipals() { + return serverPrincipals; } public Kind getTokenKind() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java index 480e724599b..65893c1a75c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslClientAuthenticationProvider.java @@ -31,7 +31,6 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -46,9 +45,9 @@ public class DigestSaslClientAuthenticationProvider extends DigestSaslAuthentica implements SaslClientAuthenticationProvider { @Override - public SaslClient createClient(Configuration conf, InetAddress serverAddr, - SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, - Map<String, String> saslProps) throws IOException { + public SaslClient createClient(Configuration conf, InetAddress serverAddr, String serverPrincipal, + Token<? extends TokenIdentifier> token, boolean fallbackAllowed, Map<String, String> saslProps) + throws IOException { return Sasl.createSaslClient(new String[] { getSaslAuthMethod().getSaslMechanism() }, null, null, SaslUtil.SASL_DEFAULT_REALM, saslProps, new DigestSaslClientCallbackHandler(token)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java index 218fd13b60c..77e92b35bd8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslClientAuthenticationProvider.java @@ -24,10 +24,7 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.SecurityConstants; -import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -43,46 +40,10 @@ public class GssSaslClientAuthenticationProvider extends GssSaslAuthenticationPr private static final Logger LOG = LoggerFactory.getLogger(GssSaslClientAuthenticationProvider.class); - private static boolean useCanonicalHostname(Configuration conf) { - return !conf.getBoolean( - SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS, - SecurityConstants.DEFAULT_UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS); - } - - public static String getHostnameForServerPrincipal(Configuration conf, InetAddress addr) { - final String hostname; - - if (useCanonicalHostname(conf)) { - hostname = addr.getCanonicalHostName(); - if (hostname.equals(addr.getHostAddress())) { - LOG.warn("Canonical hostname for SASL principal is the same with IP address: " + hostname - + ", " + addr.getHostName() + ". Check DNS configuration or consider " - + SecurityConstants.UNSAFE_HBASE_CLIENT_KERBEROS_HOSTNAME_DISABLE_REVERSEDNS + "=true"); - } - } else { - hostname = addr.getHostName(); - } - - return hostname.toLowerCase(); - } - - String getServerPrincipal(Configuration conf, SecurityInfo securityInfo, InetAddress server) - throws IOException { - String hostname = getHostnameForServerPrincipal(conf, server); - - String serverKey = securityInfo.getServerPrincipal(); - if (serverKey == null) { - throw new IllegalArgumentException( - "Can't obtain server Kerberos config key from SecurityInfo"); - } - return SecurityUtil.getServerPrincipal(conf.get(serverKey), hostname); - } - @Override - public SaslClient createClient(Configuration conf, InetAddress serverAddr, - SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, - Map<String, String> saslProps) throws IOException { - String serverPrincipal = getServerPrincipal(conf, securityInfo, serverAddr); + public SaslClient createClient(Configuration conf, InetAddress serverAddr, String serverPrincipal, + Token<? extends TokenIdentifier> token, boolean fallbackAllowed, Map<String, String> saslProps) + throws IOException { LOG.debug("Setting up Kerberos RPC to server={}", serverPrincipal); String[] names = SaslUtil.splitKerberosName(serverPrincipal); if (names.length != 3) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java index bbc5ddac91a..4e23247ca76 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SaslClientAuthenticationProvider.java @@ -31,6 +31,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; /** @@ -45,11 +46,33 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformati public interface SaslClientAuthenticationProvider extends SaslAuthenticationProvider { /** - * Creates the SASL client instance for this auth'n method. + * Creates the SASL client instance for this authentication method. + * @deprecated Since 2.5.8 and 2.6.0. In our own code will not call this method any more, + * customized authentication method should implement + * {@link #createClient(Configuration, InetAddress, String, Token, boolean, Map)} + * instead. Will be removed in 4.0.0. */ - SaslClient createClient(Configuration conf, InetAddress serverAddr, SecurityInfo securityInfo, - Token<? extends TokenIdentifier> token, boolean fallbackAllowed, Map<String, String> saslProps) - throws IOException; + @Deprecated + default SaslClient createClient(Configuration conf, InetAddress serverAddr, + SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, + Map<String, String> saslProps) throws IOException { + throw new UnsupportedOperationException("should not be used any more"); + } + + /** + * Create the SASL client instance for this authentication method. + * <p> + * The default implementation is create a fake {@link SecurityInfo} and call the above method, for + * keeping compatible with old customized authentication method + */ + default SaslClient createClient(Configuration conf, InetAddress serverAddr, + String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, + Map<String, String> saslProps) throws IOException { + String principalKey = "hbase.fake.kerberos.principal"; + conf.set(principalKey, serverPrincipal); + return createClient(conf, serverAddr, new SecurityInfo(principalKey, Kind.HBASE_AUTH_TOKEN), + token, fallbackAllowed, saslProps); + } /** * Constructs a {@link UserInformation} from the given {@link UserGroupInformation} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java index 6fff703689c..70e469003c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslClientAuthenticationProvider.java @@ -22,7 +22,6 @@ import java.net.InetAddress; import java.util.Map; import javax.security.sasl.SaslClient; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -37,7 +36,7 @@ public class SimpleSaslClientAuthenticationProvider extends SimpleSaslAuthentica @Override public SaslClient createClient(Configuration conf, InetAddress serverAddress, - SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, + String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, Map<String, String> saslProps) throws IOException { return null; } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java index 7b42ba224fa..6b1e7c33832 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java @@ -53,10 +53,8 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,9 +78,6 @@ public class TestHBaseSaslRpcClient { private static final Logger LOG = LoggerFactory.getLogger(TestHBaseSaslRpcClient.class); - @Rule - public ExpectedException exception = ExpectedException.none(); - @Test public void testSaslClientUsesGivenRpcProtection() throws Exception { Token<? extends TokenIdentifier> token = @@ -90,8 +85,7 @@ public class TestHBaseSaslRpcClient { DigestSaslClientAuthenticationProvider provider = new DigestSaslClientAuthenticationProvider(); for (SaslUtil.QualityOfProtection qop : SaslUtil.QualityOfProtection.values()) { String negotiatedQop = new HBaseSaslRpcClient(HBaseConfiguration.create(), provider, token, - Mockito.mock(InetAddress.class), Mockito.mock(SecurityInfo.class), false, qop.name(), - false) { + Mockito.mock(InetAddress.class), "", false, qop.name(), false) { public String getQop() { return saslProps.get(Sasl.QOP); } @@ -192,14 +186,14 @@ public class TestHBaseSaslRpcClient { DigestSaslClientAuthenticationProvider provider = new DigestSaslClientAuthenticationProvider() { @Override public SaslClient createClient(Configuration conf, InetAddress serverAddress, - SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, + String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, Map<String, String> saslProps) { return Mockito.mock(SaslClient.class); } }; HBaseSaslRpcClient rpcClient = new HBaseSaslRpcClient(HBaseConfiguration.create(), provider, - createTokenMockWithCredentials(principal, password), Mockito.mock(InetAddress.class), - Mockito.mock(SecurityInfo.class), false); + createTokenMockWithCredentials(principal, password), Mockito.mock(InetAddress.class), "", + false); try { rpcClient.getInputStream(); @@ -224,14 +218,14 @@ public class TestHBaseSaslRpcClient { new DigestSaslClientAuthenticationProvider() { @Override public SaslClient createClient(Configuration conf, InetAddress serverAddress, - SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, - boolean fallbackAllowed, Map<String, String> saslProps) { + String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, + Map<String, String> saslProps) { return null; } }; new HBaseSaslRpcClient(HBaseConfiguration.create(), provider, - createTokenMockWithCredentials(principal, password), Mockito.mock(InetAddress.class), - Mockito.mock(SecurityInfo.class), false); + createTokenMockWithCredentials(principal, password), Mockito.mock(InetAddress.class), "", + false); return false; } catch (IOException ex) { return true; @@ -254,8 +248,8 @@ public class TestHBaseSaslRpcClient { try { rpcClient = new HBaseSaslRpcClient(HBaseConfiguration.create(), new DigestSaslClientAuthenticationProvider(), - createTokenMockWithCredentials(principal, password), Mockito.mock(InetAddress.class), - Mockito.mock(SecurityInfo.class), false); + createTokenMockWithCredentials(principal, password), Mockito.mock(InetAddress.class), "", + false); } catch (Exception ex) { LOG.error(ex.getMessage(), ex); } @@ -275,7 +269,7 @@ public class TestHBaseSaslRpcClient { private HBaseSaslRpcClient createSaslRpcClientForKerberos() throws IOException { return new HBaseSaslRpcClient(HBaseConfiguration.create(), new GssSaslClientAuthenticationProvider(), createTokenMock(), Mockito.mock(InetAddress.class), - Mockito.mock(SecurityInfo.class), false); + "", false); } private Token<? extends TokenIdentifier> createTokenMockWithCredentials(String principal, @@ -291,7 +285,7 @@ public class TestHBaseSaslRpcClient { private HBaseSaslRpcClient createSaslRpcClientSimple() throws IOException { return new HBaseSaslRpcClient(HBaseConfiguration.create(), new SimpleSaslClientAuthenticationProvider(), createTokenMock(), - Mockito.mock(InetAddress.class), Mockito.mock(SecurityInfo.class), false); + Mockito.mock(InetAddress.class), "", false); } @SuppressWarnings("unchecked") diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java index d0930a0f314..3b83d7dda63 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/security/provider/example/ShadeSaslClientAuthenticationProvider.java @@ -31,7 +31,6 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; import org.apache.hadoop.hbase.util.Bytes; @@ -46,9 +45,9 @@ public class ShadeSaslClientAuthenticationProvider extends ShadeSaslAuthenticati implements SaslClientAuthenticationProvider { @Override - public SaslClient createClient(Configuration conf, InetAddress serverAddr, - SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, - Map<String, String> saslProps) throws IOException { + public SaslClient createClient(Configuration conf, InetAddress serverAddr, String serverPrincipal, + Token<? extends TokenIdentifier> token, boolean fallbackAllowed, Map<String, String> saslProps) + throws IOException { return Sasl.createSaslClient(new String[] { getSaslAuthMethod().getSaslMechanism() }, null, null, SaslUtil.SASL_DEFAULT_REALM, saslProps, new ShadeSaslClientCallbackHandler(token)); } diff --git a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto index e992e681fbf..3e44f8e16fa 100644 --- a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto +++ b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto @@ -159,3 +159,7 @@ message ResponseHeader { // If present, then an encoded data block follows. optional CellBlockMeta cell_block_meta = 3; } + +message SecurityPreamableResponse { + required string server_principal = 1; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 0876a1fd55f..a84d132a013 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -67,6 +67,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.gson.Gson; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; @@ -117,6 +118,7 @@ public abstract class RpcServer implements RpcServerInterface, ConfigurationObse LoggerFactory.getLogger("SecurityLogger." + Server.class.getName()); protected SecretManager<TokenIdentifier> secretManager; protected final Map<String, String> saslProps; + protected final String serverPrincipal; protected ServiceAuthorizationManager authManager; @@ -211,7 +213,7 @@ public abstract class RpcServer implements RpcServerInterface, ConfigurationObse protected final RpcScheduler scheduler; - protected UserProvider userProvider; + protected final UserProvider userProvider; protected final ByteBuffAllocator bbAllocator; @@ -300,8 +302,11 @@ public abstract class RpcServer implements RpcServerInterface, ConfigurationObse if (isSecurityEnabled) { saslProps = SaslUtil.initSaslProperties(conf.get("hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT))); + serverPrincipal = Preconditions.checkNotNull(userProvider.getCurrentUserName(), + "can not get current user name when security is enabled"); } else { saslProps = Collections.emptyMap(); + serverPrincipal = HConstants.EMPTY_STRING; } this.isOnlineLogProviderEnabled = getIsOnlineLogProviderEnabled(conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index be97ad582c3..31f46f30c38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.SecurityPreamableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; @@ -695,6 +696,13 @@ abstract class ServerRpcConnection implements Closeable { doRespond(getErrorResponse(msg, e)); } + private void doPreambleResponse(Message resp) throws IOException { + ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build(); + ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null); + BufferChain bufChain = new BufferChain(buf); + doRespond(() -> bufChain); + } + private boolean doConnectionRegistryResponse() throws IOException { if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) { // should be in tests or some scenarios where we should not reach here @@ -710,13 +718,22 @@ abstract class ServerRpcConnection implements Closeable { } GetConnectionRegistryResponse resp = GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build(); - ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build(); - ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null); - BufferChain bufChain = new BufferChain(buf); - doRespond(() -> bufChain); + doPreambleResponse(resp); return true; } + private void doSecurityPreambleResponse() throws IOException { + if (rpcServer.isSecurityEnabled) { + SecurityPreamableResponse resp = SecurityPreamableResponse.newBuilder() + .setServerPrincipal(rpcServer.serverPrincipal).build(); + doPreambleResponse(resp); + } else { + // security is not enabled, do not need a principal when connecting, throw a special exception + // to let client know it should just use simple authentication + doRespond(getErrorResponse("security is not enabled", new SecurityNotEnabledException())); + } + } + protected final void callCleanupIfNeeded() { if (callCleanup != null) { callCleanup.run(); @@ -738,6 +755,13 @@ abstract class ServerRpcConnection implements Closeable { ) { return PreambleResponse.CLOSE; } + if ( + ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, + RpcClient.SECURITY_PREAMBLE_HEADER, 0, 6) + ) { + doSecurityPreambleResponse(); + return PreambleResponse.CONTINUE; + } if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, RPC_HEADER, 0, 4)) { doBadPreambleHandling( "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java index 9e90a7a3133..1b28c19b430 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java @@ -145,7 +145,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection { return count; case CONTINUE: // wait for the next preamble header - preambleBuffer.reset(); + preambleBuffer.clear(); return count; case CLOSE: return -1; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMultipleServerPrincipalsIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMultipleServerPrincipalsIPC.java new file mode 100644 index 00000000000..237f1cb4025 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestMultipleServerPrincipalsIPC.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.security.sasl.SaslException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.security.SecurityInfo; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException; + +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; + +/** + * Tests for HBASE-28321, where we have multiple server principals candidates for a rpc service. + * <p> + * Put here just because we need to visit some package private classes under this package. + */ +@RunWith(Parameterized.class) +@Category({ SecurityTests.class, MediumTests.class }) +public class TestMultipleServerPrincipalsIPC { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMultipleServerPrincipalsIPC.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private static final File KEYTAB_FILE = + new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + + private static MiniKdc KDC; + private static String HOST = "localhost"; + private static String SERVER_PRINCIPAL; + private static String SERVER_PRINCIPAL2; + private static String CLIENT_PRINCIPAL; + + @Parameter(0) + public Class<? extends RpcServer> rpcServerImpl; + + @Parameter(1) + public Class<? extends RpcClient> rpcClientImpl; + + private Configuration clientConf; + private Configuration serverConf; + private UserGroupInformation clientUGI; + private UserGroupInformation serverUGI; + private RpcServer rpcServer; + private RpcClient rpcClient; + + @Parameters(name = "{index}: rpcServerImpl={0}, rpcClientImpl={1}") + public static List<Object[]> params() { + List<Object[]> params = new ArrayList<>(); + List<Class<? extends RpcServer>> rpcServerImpls = + Arrays.asList(NettyRpcServer.class, SimpleRpcServer.class); + List<Class<? extends RpcClient>> rpcClientImpls = + Arrays.asList(NettyRpcClient.class, BlockingRpcClient.class); + for (Class<? extends RpcServer> rpcServerImpl : rpcServerImpls) { + for (Class<? extends RpcClient> rpcClientImpl : rpcClientImpls) { + params.add(new Object[] { rpcServerImpl, rpcClientImpl }); + } + } + return params; + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); + SERVER_PRINCIPAL = "server/" + HOST + "@" + KDC.getRealm(); + SERVER_PRINCIPAL2 = "server2/" + HOST + "@" + KDC.getRealm(); + CLIENT_PRINCIPAL = "client"; + KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, SERVER_PRINCIPAL2); + setSecuredConfiguration(TEST_UTIL.getConfiguration()); + TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 1); + TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 0); + TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 10); + } + + @AfterClass + public static void tearDownAfterClass() { + if (KDC != null) { + KDC.stop(); + } + } + + private static void setSecuredConfiguration(Configuration conf) { + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos"); + conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true); + } + + private void loginAndStartRpcServer(String principal, int port) throws Exception { + UserGroupInformation.setConfiguration(serverConf); + serverUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, + KEYTAB_FILE.getCanonicalPath()); + rpcServer = serverUGI.doAs((PrivilegedExceptionAction< + RpcServer>) () -> RpcServerFactory.createRpcServer(null, getClass().getSimpleName(), + Lists.newArrayList( + new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null)), + new InetSocketAddress(HOST, port), serverConf, new FifoRpcScheduler(serverConf, 1))); + rpcServer.start(); + } + + @Before + public void setUp() throws Exception { + clientConf = new Configuration(TEST_UTIL.getConfiguration()); + clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl, + RpcClient.class); + String serverPrincipalConfigName = "hbase.test.multiple.principal.first"; + String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second"; + clientConf.set(serverPrincipalConfigName, SERVER_PRINCIPAL); + clientConf.set(serverPrincipalConfigName2, SERVER_PRINCIPAL2); + serverConf = new Configuration(TEST_UTIL.getConfiguration()); + serverConf.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, + RpcServer.class); + SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, serverPrincipalConfigName2, + serverPrincipalConfigName); + SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), securityInfo); + + UserGroupInformation.setConfiguration(clientConf); + clientUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL, + KEYTAB_FILE.getCanonicalPath()); + loginAndStartRpcServer(SERVER_PRINCIPAL, 0); + rpcClient = clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> RpcClientFactory + .createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())); + } + + @After + public void tearDown() throws IOException { + Closeables.close(rpcClient, true); + rpcServer.stop(); + } + + private String echo(String msg) throws Exception { + return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> { + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( + ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), -1), User.getCurrent(), + 10000); + TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcProto.newBlockingStub(channel); + return stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build()) + .getMessage(); + }); + } + + @Test + public void testEcho() throws Exception { + String msg = "Hello World"; + assertEquals(msg, echo(msg)); + } + + @Test + public void testMaliciousServer() throws Exception { + // reset the server principals so the principal returned by server does not match + SecurityInfo securityInfo = + SecurityInfo.getInfo(TestProtobufRpcProto.getDescriptor().getName()); + for (int i = 0; i < securityInfo.getServerPrincipals().size(); i++) { + clientConf.set(securityInfo.getServerPrincipals().get(i), + "valid_server_" + i + "/" + HOST + "@" + KDC.getRealm()); + } + UndeclaredThrowableException error = + assertThrows(UndeclaredThrowableException.class, () -> echo("whatever")); + assertThat(error.getCause(), instanceOf(ServiceException.class)); + assertThat(error.getCause().getCause(), instanceOf(SaslException.class)); + } + + @Test + public void testRememberLastSucceededServerPrincipal() throws Exception { + // after this call we will remember the last succeeded server principal + assertEquals("a", echo("a")); + // shutdown the connection, but does not remove it from pool + RpcConnection conn = + Iterables.getOnlyElement(((AbstractRpcClient<?>) rpcClient).getConnections().values()); + conn.shutdown(); + // recreate rpc server with server principal2 + int port = rpcServer.getListenerAddress().getPort(); + rpcServer.stop(); + serverUGI.logoutUserFromKeytab(); + loginAndStartRpcServer(SERVER_PRINCIPAL2, port); + // this time we will still use the remembered server principal, so we will get a sasl exception + UndeclaredThrowableException error = + assertThrows(UndeclaredThrowableException.class, () -> echo("a")); + assertThat(error.getCause(), instanceOf(ServiceException.class)); + // created by IPCUtil.wrap, to prepend the server address + assertThat(error.getCause().getCause(), instanceOf(IOException.class)); + // wraped IPCUtil.toIOE + assertThat(error.getCause().getCause().getCause(), instanceOf(IOException.class)); + Throwable cause = error.getCause().getCause().getCause().getCause(); + // for netty rpc client, it is DecoderException, for blocking rpc client, it is already + // RemoteExcetion + assertThat(cause, + either(instanceOf(DecoderException.class)).or(instanceOf(RemoteException.class))); + RemoteException rme; + if (!(cause instanceof RemoteException)) { + assertThat(cause.getCause(), instanceOf(RemoteException.class)); + rme = (RemoteException) cause.getCause(); + } else { + rme = (RemoteException) cause; + } + assertEquals(SaslException.class.getName(), rme.getClassName()); + // the above failure will clear the remembered server principal, so this time we will get the + // correct one. We use retry here just because a failure of sasl negotiation will trigger a + // relogin and it may take some time, and for netty based implementation the relogin is async + TEST_UTIL.waitFor(10000, () -> { + try { + echo("a"); + } catch (UndeclaredThrowableException e) { + Throwable t = e.getCause().getCause(); + assertThat(t, instanceOf(IOException.class)); + if (!(t instanceof FailedServerException)) { + // for netty rpc client + assertThat(e.getCause().getMessage(), + containsString(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS)); + } + return false; + } + return true; + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java index bc791754a12..345514396d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.net.InetSocketAddress; +import java.util.Collections; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -125,8 +126,8 @@ public class TestRpcSkipInitialSaslHandshake { @Test public void test() throws Exception { SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); - Mockito.when(securityInfoMock.getServerPrincipal()) - .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); + Mockito.when(securityInfoMock.getServerPrincipals()) + .thenReturn(Collections.singletonList(HBaseKerberosUtils.KRB_PRINCIPAL)); SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); final AtomicReference<NettyServerRpcConnection> conn = new AtomicReference<>(null); @@ -152,7 +153,6 @@ public class TestRpcSkipInitialSaslHandshake { .getMessage(); assertTrue("test".equals(response)); assertFalse(conn.get().useSasl); - } finally { rpcServer.stop(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java index b5e46b5c7cf..a74477bf28c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.net.InetSocketAddress; +import java.util.Collections; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; @@ -122,8 +123,8 @@ public class TestSecurityRpcSentBytesMetrics { @Test public void test() throws Exception { SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); - Mockito.when(securityInfoMock.getServerPrincipal()) - .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); + Mockito.when(securityInfoMock.getServerPrincipals()) + .thenReturn(Collections.singletonList(HBaseKerberosUtils.KRB_PRINCIPAL)); SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); NettyRpcServer rpcServer = new NettyRpcServer(null, getClass().getSimpleName(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java index 998896c9468..31e01a98ad6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java @@ -104,7 +104,7 @@ public class AbstractTestSecureIPC { TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 100); } - protected static void stopKDC() throws InterruptedException { + protected static void stopKDC() { if (KDC != null) { KDC.stop(); } @@ -192,8 +192,8 @@ public class AbstractTestSecureIPC { return new SaslClientAuthenticationProvider() { @Override public SaslClient createClient(Configuration conf, InetAddress serverAddr, - SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, - boolean fallbackAllowed, Map<String, String> saslProps) throws IOException { + String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, + Map<String, String> saslProps) throws IOException { final String s = conf.get(CANONICAL_HOST_NAME_KEY); if (s != null) { try { @@ -206,7 +206,7 @@ public class AbstractTestSecureIPC { } } - return delegate.createClient(conf, serverAddr, securityInfo, token, fallbackAllowed, + return delegate.createClient(conf, serverAddr, serverPrincipal, token, fallbackAllowed, saslProps); } @@ -385,8 +385,8 @@ public class AbstractTestSecureIPC { */ private void callRpcService(User serverUser, User clientUser) throws Exception { SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); - Mockito.when(securityInfoMock.getServerPrincipal()) - .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); + Mockito.when(securityInfoMock.getServerPrincipals()) + .thenReturn(Collections.singletonList(HBaseKerberosUtils.KRB_PRINCIPAL)); SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); InetSocketAddress isa = new InetSocketAddress(HOST, 0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMultipleServerPrincipalsFallbackToSimple.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMultipleServerPrincipalsFallbackToSimple.java new file mode 100644 index 00000000000..6f1cc148204 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestMultipleServerPrincipalsFallbackToSimple.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.security; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.BlockingRpcClient; +import org.apache.hadoop.hbase.ipc.FallbackDisallowedException; +import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; +import org.apache.hadoop.hbase.ipc.NettyRpcClient; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.RpcServerFactory; +import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; + +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind; + +/** + * Test secure client connecting to a non secure server, where we have multiple server principal + * candidates for a rpc service. See HBASE-28321. + */ +@RunWith(Parameterized.class) +@Category({ SecurityTests.class, MediumTests.class }) +public class TestMultipleServerPrincipalsFallbackToSimple { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMultipleServerPrincipalsFallbackToSimple.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private static final File KEYTAB_FILE = + new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + + private static MiniKdc KDC; + private static String HOST = "localhost"; + private static String SERVER_PRINCIPAL; + private static String SERVER_PRINCIPAL2; + private static String CLIENT_PRINCIPAL; + + @Parameter + public Class<? extends RpcClient> rpcClientImpl; + + private Configuration clientConf; + private UserGroupInformation clientUGI; + private RpcServer rpcServer; + private RpcClient rpcClient; + + @Parameters(name = "{index}: rpcClientImpl={0}") + public static List<Object[]> params() { + return Arrays.asList(new Object[] { NettyRpcClient.class }, + new Object[] { BlockingRpcClient.class }); + } + + private static void setSecuredConfiguration(Configuration conf) { + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos"); + conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); + SERVER_PRINCIPAL = "server/" + HOST; + SERVER_PRINCIPAL2 = "server2/" + HOST; + CLIENT_PRINCIPAL = "client"; + KDC.createPrincipal(KEYTAB_FILE, CLIENT_PRINCIPAL, SERVER_PRINCIPAL, SERVER_PRINCIPAL2); + TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxbackoff", 1); + TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 0); + TEST_UTIL.getConfiguration().setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 10); + } + + @Before + public void setUp() throws Exception { + clientConf = new Configuration(TEST_UTIL.getConfiguration()); + setSecuredConfiguration(clientConf); + clientConf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcClientImpl, + RpcClient.class); + String serverPrincipalConfigName = "hbase.test.multiple.principal.first"; + String serverPrincipalConfigName2 = "hbase.test.multiple.principal.second"; + clientConf.set(serverPrincipalConfigName, "server/localhost@" + KDC.getRealm()); + clientConf.set(serverPrincipalConfigName2, "server2/localhost@" + KDC.getRealm()); + SecurityInfo securityInfo = new SecurityInfo(Kind.HBASE_AUTH_TOKEN, serverPrincipalConfigName2, + serverPrincipalConfigName); + SecurityInfo.addInfo(TestProtobufRpcProto.getDescriptor().getName(), securityInfo); + + UserGroupInformation.setConfiguration(clientConf); + clientUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL, + KEYTAB_FILE.getCanonicalPath()); + + rpcServer = RpcServerFactory.createRpcServer(null, getClass().getSimpleName(), + Lists.newArrayList( + new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null)), + new InetSocketAddress(HOST, 0), TEST_UTIL.getConfiguration(), + new FifoRpcScheduler(TEST_UTIL.getConfiguration(), 1)); + rpcServer.start(); + } + + @After + public void tearDown() throws IOException { + Closeables.close(rpcClient, true); + rpcServer.stop(); + } + + private RpcClient createClient() throws Exception { + return clientUGI.doAs((PrivilegedExceptionAction<RpcClient>) () -> RpcClientFactory + .createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())); + } + + private String echo(String msg) throws Exception { + return clientUGI.doAs((PrivilegedExceptionAction<String>) () -> { + BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( + ServerName.valueOf(HOST, rpcServer.getListenerAddress().getPort(), -1), User.getCurrent(), + 10000); + TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcProto.newBlockingStub(channel); + return stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(msg).build()) + .getMessage(); + }); + } + + @Test + public void testAllowFallbackToSimple() throws Exception { + clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, true); + rpcClient = createClient(); + assertEquals("allow", echo("allow")); + } + + @Test + public void testDisallowFallbackToSimple() throws Exception { + clientConf.setBoolean(RpcClient.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, false); + rpcClient = createClient(); + UndeclaredThrowableException error = + assertThrows(UndeclaredThrowableException.class, () -> echo("disallow")); + Throwable cause = error.getCause().getCause().getCause(); + assertThat(cause, instanceOf(FallbackDisallowedException.class)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSaslTlsIPCRejectPlainText.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSaslTlsIPCRejectPlainText.java index a6984fcdf3a..ea9b6948011 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSaslTlsIPCRejectPlainText.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSaslTlsIPCRejectPlainText.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosP import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration; import java.io.File; +import java.util.Collections; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl; @@ -66,8 +67,8 @@ public class TestSaslTlsIPCRejectPlainText extends AbstractTestTlsRejectPlainTex UGI = loginKerberosPrincipal(KEYTAB_FILE.getCanonicalPath(), PRINCIPAL); setSecuredConfiguration(util.getConfiguration()); SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); - Mockito.when(securityInfoMock.getServerPrincipal()) - .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); + Mockito.when(securityInfoMock.getServerPrincipals()) + .thenReturn(Collections.singletonList(HBaseKerberosUtils.KRB_PRINCIPAL)); SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/CustomSaslAuthenticationProviderTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/CustomSaslAuthenticationProviderTestBase.java index feba17364cc..66b65ba03f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/CustomSaslAuthenticationProviderTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/provider/CustomSaslAuthenticationProviderTestBase.java @@ -75,7 +75,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerFactory; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.token.SecureTestCluster; import org.apache.hadoop.hbase.security.token.TokenProvider; @@ -202,7 +201,7 @@ public abstract class CustomSaslAuthenticationProviderTestBase { @Override public SaslClient createClient(Configuration conf, InetAddress serverAddr, - SecurityInfo securityInfo, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, + String serverPrincipal, Token<? extends TokenIdentifier> token, boolean fallbackAllowed, Map<String, String> saslProps) throws IOException { return Sasl.createSaslClient(new String[] { MECHANISM }, null, null, SaslUtil.SASL_DEFAULT_REALM, saslProps, new InMemoryClientProviderCallbackHandler(token));