HTHou commented on code in PR #15994: URL: https://github.com/apache/iotdb/pull/15994#discussion_r2313565919
########## iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonblockingTransport.java: ########## @@ -0,0 +1,621 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.security.KeyStore; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A non-blocking Thrift transport implementation using Netty for asynchronous I/O. Integrates with + * Thrift's TAsyncClientManager using a dummy local SocketChannel for selector events. Supports + * SSL/TLS for secure communication. + */ +public class NettyTNonblockingTransport extends TNonblockingTransport { + + private static final Logger logger = LoggerFactory.getLogger(NettyTNonblockingTransport.class); + + private final String host; + private final int port; + private final int connectTimeoutMs; + private final long sslHandshakeTimeoutMs; + private final EventLoopGroup eventLoopGroup; + private final Bootstrap bootstrap; + private volatile Channel channel; + private final AtomicBoolean connected = new AtomicBoolean(false); + private final AtomicBoolean connecting = new AtomicBoolean(false); + private ChannelFuture future; + private final LinkedBlockingQueue<ByteBuf> readQueue = new LinkedBlockingQueue<>(); + private final Object lock = new Object(); + + // SSL configuration + private final String keystorePath; + private final String keystorePassword; + private final String truststorePath; + private final String truststorePassword; + + // Dummy local socket for selector integration + private ServerSocketChannel dummyServer; + private java.nio.channels.SocketChannel dummyClient; + private java.nio.channels.SocketChannel dummyServerAccepted; + private int dummyPort; + private Selector selector; // Stored for wakeup if needed + + public NettyTNonblockingTransport( + String host, + int port, + int connectTimeoutMs, + String keystorePath, + String keystorePassword, + String truststorePath, + String truststorePassword) + throws TTransportException { + super(new TConfiguration()); + this.host = host; + this.port = port; + this.connectTimeoutMs = connectTimeoutMs; + this.sslHandshakeTimeoutMs = connectTimeoutMs; + this.eventLoopGroup = new NioEventLoopGroup(); + this.bootstrap = new Bootstrap(); + this.keystorePath = keystorePath; + this.keystorePassword = keystorePassword; + this.truststorePath = truststorePath; + this.truststorePassword = truststorePassword; + initDummyChannels(); + initBootstrap(); + } + + /** Initializes dummy local channels for selector event simulation. */ + private void initDummyChannels() throws TTransportException { + try { + dummyServer = ServerSocketChannel.open(); + dummyServer.configureBlocking(false); + dummyServer.bind(new InetSocketAddress("localhost", 0)); + dummyPort = dummyServer.socket().getLocalPort(); + if (logger.isDebugEnabled()) { + logger.debug("Dummy server bound to localhost:{}", dummyPort); + } + dummyClient = java.nio.channels.SocketChannel.open(); + dummyClient.configureBlocking(false); + } catch (IOException e) { + throw new TTransportException("Failed to initialize dummy channels", e); + } + } + + private void initBootstrap() { + bootstrap + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs) + .handler( + new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Initializing channel for {}:{}", host, port); + } + ChannelPipeline pipeline = ch.pipeline(); + SslContext sslContext = createSslContext(); + SslHandler sslHandler = sslContext.newHandler(ch.alloc(), host, port); + sslHandler.setHandshakeTimeoutMillis(sslHandshakeTimeoutMs); + // set this for avoiding error log on the server side + sslHandler.setCloseNotifyReadTimeoutMillis(100); + + pipeline.addLast("ssl", sslHandler); + sslHandler + .handshakeFuture() + .addListener( + future -> { + if (future.isSuccess()) { + if (logger.isDebugEnabled()) { + logger.debug( + "SSL handshake completed successfully for {}:{}", host, port); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug( + "SSL handshake failed for {}:{}: {}", + host, + port, + future.cause().getMessage()); + } Review Comment: SSL handshake failed may happen when restart some nodes. If use the normal TNonblockingTransport, all failure logs are all debug level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
