This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push: new 69fcce20c74 HBASE-27924 Remove duplicate code for NettyHBaseSaslRpcServerHandler … (#5285) 69fcce20c74 is described below commit 69fcce20c7411988e0c7e6d8c76a9e1010d82ce1 Author: chenglei <cheng...@apache.org> AuthorDate: Fri Jun 16 23:36:23 2023 +0800 HBASE-27924 Remove duplicate code for NettyHBaseSaslRpcServerHandler … (#5285) Co-authored-by: comnetwork <comnetw...@163.com> Signed-off-by: Duo Zhang <zhang...@apache.org> (cherry picked from commit 0703d36daf8dd5c36164419032ff0760bb3f65cc) --- .../hbase/ipc/NettyHBaseSaslRpcServerHandler.java | 25 +--- .../hbase/ipc/TestSecurityRpcSentBytesMetrics.java | 155 +++++++++++++++++++++ 2 files changed, 157 insertions(+), 23 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyHBaseSaslRpcServerHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyHBaseSaslRpcServerHandler.java index cb7a173625e..dd6f84daae3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyHBaseSaslRpcServerHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyHBaseSaslRpcServerHandler.java @@ -17,20 +17,16 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.IOException; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; import org.apache.hadoop.hbase.security.SaslStatus; import org.apache.hadoop.hbase.security.SaslUnwrapHandler; import org.apache.hadoop.hbase.security.SaslWrapHandler; import org.apache.hadoop.hbase.util.NettyFutureUtils; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; -import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; @@ -54,23 +50,6 @@ class NettyHBaseSaslRpcServerHandler extends SimpleChannelInboundHandler<ByteBuf this.conn = conn; } - private void doResponse(ChannelHandlerContext ctx, SaslStatus status, Writable rv, - String errorClass, String error) throws IOException { - // In my testing, have noticed that sasl messages are usually - // in the ballpark of 100-200. That's why the initial capacity is 256. - ByteBuf resp = ctx.alloc().buffer(256); - try (ByteBufOutputStream out = new ByteBufOutputStream(resp)) { - out.writeInt(status.state); // write status - if (status == SaslStatus.SUCCESS) { - rv.write(out); - } else { - WritableUtils.writeString(out, errorClass); - WritableUtils.writeString(out, error); - } - } - NettyFutureUtils.safeWriteAndFlush(ctx, resp); - } - @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { LOG.debug("Read input token of size={} for processing by saslServer.evaluateResponse()", @@ -81,7 +60,7 @@ class NettyHBaseSaslRpcServerHandler extends SimpleChannelInboundHandler<ByteBuf byte[] replyToken = saslServer.evaluateResponse(saslToken); if (replyToken != null) { LOG.debug("Will send token of size {} from saslServer.", replyToken.length); - doResponse(ctx, SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null); + conn.doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null, null); } if (saslServer.isComplete()) { conn.finishSaslNegotiation(); @@ -105,7 +84,7 @@ class NettyHBaseSaslRpcServerHandler extends SimpleChannelInboundHandler<ByteBuf public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOG.error("Error when doing SASL handshade, provider={}", conn.provider, cause); Throwable sendToClient = HBaseSaslRpcServer.unwrap(cause); - doResponse(ctx, SaslStatus.ERROR, null, sendToClient.getClass().getName(), + conn.doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(), sendToClient.getLocalizedMessage()); rpcServer.metrics.authenticationFailure(); String clientIP = this.toString(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java new file mode 100644 index 00000000000..16c7b6fc71a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSecurityRpcSentBytesMetrics.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.loginKerberosPrincipal; +import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.setSecuredConfiguration; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.InetSocketAddress; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.security.HBaseKerberosUtils; +import org.apache.hadoop.hbase.security.SecurityInfo; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; + +@Category({ SecurityTests.class, MediumTests.class }) +public class TestSecurityRpcSentBytesMetrics { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSecurityRpcSentBytesMetrics.class); + + protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + protected static final File KEYTAB_FILE = + new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + + protected static MiniKdc KDC; + protected static String HOST = "localhost"; + protected static String PRINCIPAL; + + protected String krbKeytab; + protected String krbPrincipal; + protected UserGroupInformation ugi; + protected Configuration clientConf; + protected Configuration serverConf; + + protected static void initKDCAndConf() throws Exception { + KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); + PRINCIPAL = "hbase/" + HOST; + KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL); + HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); + // set a smaller timeout and retry to speed up tests + TEST_UTIL.getConfiguration().setInt(RpcClient.SOCKET_TIMEOUT_READ, 2000000000); + TEST_UTIL.getConfiguration().setInt("hbase.security.relogin.maxretries", 1); + } + + protected static void stopKDC() throws InterruptedException { + if (KDC != null) { + KDC.stop(); + } + } + + protected final void setUpPrincipalAndConf() throws Exception { + krbKeytab = getKeytabFileForTesting(); + krbPrincipal = getPrincipalForTesting(); + ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal); + clientConf = new Configuration(TEST_UTIL.getConfiguration()); + setSecuredConfiguration(clientConf); + serverConf = new Configuration(TEST_UTIL.getConfiguration()); + setSecuredConfiguration(serverConf); + } + + @BeforeClass + public static void setUp() throws Exception { + initKDCAndConf(); + } + + @AfterClass + public static void tearDown() throws Exception { + stopKDC(); + TEST_UTIL.cleanupTestDir(); + } + + @Before + public void setUpTest() throws Exception { + setUpPrincipalAndConf(); + } + + /** + * This test is for HBASE-27924, before this JIRA, bytes sent by + * {@link NettyHBaseSaslRpcServerHandler} is ignored by {@link MetricsHBaseServer#sentBytes}. + */ + @Test + public void test() throws Exception { + SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class); + Mockito.when(securityInfoMock.getServerPrincipal()) + .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); + SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); + + NettyRpcServer rpcServer = new NettyRpcServer(null, getClass().getSimpleName(), + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress(HOST, 0), serverConf, new FifoRpcScheduler(serverConf, 1), true) { + + @Override + public void start() { + metrics = Mockito.spy(metrics); + super.start(); + } + }; + + rpcServer.start(); + try (NettyRpcClient rpcClient = + new NettyRpcClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString(), null, null)) { + BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress(), + User.create(UserGroupInformation.getCurrentUser())); + + String response = + stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage("test").build()) + .getMessage(); + assertTrue("test".equals(response)); + } finally { + rpcServer.stop(); + } + Mockito.verify(rpcServer.metrics, Mockito.atLeast(2)).sentBytes(Mockito.anyLong()); + } +}