This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new c5efbd92d9 optimize: Add `ChannelEventListener` support to prevent memory leaks (#7337) c5efbd92d9 is described below commit c5efbd92d97b63ef382c17fcab84719ab3b41a77 Author: Yongjun Hong <dev.yongj...@gmail.com> AuthorDate: Mon May 19 18:38:40 2025 +0900 optimize: Add `ChannelEventListener` support to prevent memory leaks (#7337) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../org/apache/seata/core/rpc/RemotingClient.java | 15 ++ .../rpc/netty/AbstractNettyRemotingClient.java | 206 ++++++++++++++- .../seata/core/rpc/netty/ChannelEventHandler.java | 107 ++++++++ .../seata/core/rpc/netty/ChannelEventListener.java | 15 +- ...nelEventListener.java => ChannelEventType.java} | 36 +-- .../core/rpc/netty/RmNettyRemotingClient.java | 20 ++ .../core/rpc/netty/TmNettyRemotingClient.java | 20 ++ .../netty/ChannelEventHandlerIntegrationTest.java | 282 +++++++++++++++++++++ .../core/rpc/netty/ChannelEventListenerTest.java | 168 ++++++++++++ .../seata/core/rpc/netty/ResourceCleanupTest.java | 140 ++++++++++ 12 files changed, 974 insertions(+), 37 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 296eec3bae..e17a54162d 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -35,6 +35,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7282](https://github.com/apache/incubator-seata/pull/7282)] optimize unexpected NullPointerException in lookup method in FileRegistryServiceImpl class - [[#7310](https://github.com/seata/seata/pull/7310)] Optimize minor issues in the naming-server - [[#7329](https://github.com/apache/incubator-seata/pull/7329)] upgrade tomcat to 9.0.100 +- [[#7337](https://github.com/apache/incubator-seata/pull/7337)] Add ChannelEventListener support to prevent memory leaks ### security: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 941ac2d168..98887b0bbe 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -34,6 +34,7 @@ - [[#7282](https://github.com/apache/incubator-seata/pull/7282)] 优化FileRegistryServiceImpl类lookup的NullPointerException问题 - [[#7310](https://github.com/seata/seata/pull/7310)] 优化naming-server中的一些小问题 - [[#7329](https://github.com/apache/incubator-seata/pull/7329)] 将 tomcat 升级到 9.0.100 +- [[#7337](https://github.com/apache/incubator-seata/pull/7337)] 添加 ChannelEventListener 支持以防止内存泄漏 ### security: diff --git a/core/src/main/java/org/apache/seata/core/rpc/RemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/RemotingClient.java index e9add834a9..1876b2fb11 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/RemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/RemotingClient.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeoutException; import io.netty.channel.Channel; import org.apache.seata.core.protocol.AbstractMessage; import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.rpc.netty.ChannelEventListener; import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; import org.apache.seata.core.rpc.processor.RemotingProcessor; @@ -101,4 +102,18 @@ public interface RemotingClient { * @param executor thread pool */ void registerProcessor(final int messageType, final RemotingProcessor processor, final ExecutorService executor); + + /** + * register channel event listener + * + * @param channelEventListener {@link ChannelEventListener} + */ + void registerChannelEventListener(ChannelEventListener channelEventListener); + + /** + * unregister channel event listener + * + * @param channelEventListener {@link ChannelEventListener} + */ + void unregisterChannelEventListener(ChannelEventListener channelEventListener); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 8618c3030b..2867cca2b6 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -16,12 +16,18 @@ */ package org.apache.seata.core.rpc.netty; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelId; import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; @@ -82,6 +88,9 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L; private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L; private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend"; + + private final CopyOnWriteArrayList<ChannelEventListener> channelEventListeners = new CopyOnWriteArrayList<>(); + protected final Object mergeLock = new Object(); /** @@ -130,7 +139,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting super(messageExecutor); this.transactionRole = transactionRole; clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole); - clientBootstrap.setChannelHandlers(new ClientHandler()); + clientBootstrap.setChannelHandlers(new ClientHandler(), new ChannelEventHandler(this)); clientChannelManager = new NettyClientChannelManager( new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig); } @@ -336,6 +345,201 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting */ protected abstract long getRpcRequestTimeout(); + + /** + * Registers a channel event listener to receive channel events. + * If the listener is already registered, it will not be added again. + * + * @param channelEventListener the channel event listener to register + */ + @Override + public void registerChannelEventListener(ChannelEventListener channelEventListener) { + if (channelEventListener != null) { + channelEventListeners.addIfAbsent(channelEventListener); + LOGGER.info("register channel event listener: {}", channelEventListener.getClass().getName()); + } + } + + /** + * Unregisters a previously registered channel event listener. + * + * @param channelEventListener the channel event listener to unregister + */ + @Override + public void unregisterChannelEventListener(ChannelEventListener channelEventListener) { + if (channelEventListener != null) { + channelEventListeners.remove(channelEventListener); + LOGGER.info("unregister channel event listener: {}", channelEventListener.getClass().getName()); + } + } + + /** + * Handles channel active events from Netty. + * Fires a CONNECTED event to all registered listeners. + * + * @param channel the channel that became active + */ + public void onChannelActive(Channel channel) { + fireChannelEvent(channel, ChannelEventType.CONNECTED); + } + + /** + * Handles channel inactive events from Netty. + * Fires a DISCONNECTED event to all registered listeners and cleans up resources. + * + * @param channel the channel that became inactive + */ + public void onChannelInactive(Channel channel) { + fireChannelEvent(channel, ChannelEventType.DISCONNECTED); + cleanupResourcesForChannel(channel); + } + + /** + * Handles channel exception events from Netty. + * Fires an EXCEPTION event to all registered listeners and cleans up resources. + * + * @param channel the channel where the exception occurred + * @param cause the throwable that represents the exception + */ + public void onChannelException(Channel channel, Throwable cause) { + fireChannelEvent(channel, ChannelEventType.EXCEPTION, cause); + cleanupResourcesForChannel(channel); + } + + /** + * Handles channel idle events from Netty. + * Fires an IDLE event to all registered listeners. + * + * @param channel the channel that became idle + */ + public void onChannelIdle(Channel channel) { + fireChannelEvent(channel, ChannelEventType.IDLE); + } + + /** + * Cleans up resources associated with a channel that has been disconnected. + * This includes collecting message IDs for the channel and cleaning up their futures. + * + * @param channel the channel for which resources should be cleaned up + */ + protected void cleanupResourcesForChannel(Channel channel) { + if (channel == null) { + return; + } + ChannelException cause = new ChannelException( + String.format("Channel disconnected: %s", channel.remoteAddress())); + + Set<Integer> messageIds = collectMessageIdsForChannel(channel.id()); + cleanupFuturesForMessageIds(messageIds, cause); + + LOGGER.info("Cleaned up {} pending requests for disconnected channel: {}", + messageIds.size(), channel.remoteAddress()); + } + + /** + * Collects message IDs associated with a specific channel. + * This is used during channel cleanup to identify pending requests. + * + * @param channelId the ID of the channel + * @return a set of message IDs associated with the channel + */ + private Set<Integer> collectMessageIdsForChannel(ChannelId channelId) { + Set<Integer> messageIds = new HashSet<>(); + + String serverAddress = null; + for (Map.Entry<String, Channel> entry : clientChannelManager.getChannels().entrySet()) { + Channel channel = entry.getValue(); + if (channelId.equals(channel.id())) { + serverAddress = entry.getKey(); + break; + } + } + + if (serverAddress == null) { + return messageIds; + } + + Iterator<Map.Entry<Integer, MergeMessage>> it = mergeMsgMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Integer, MergeMessage> entry = it.next(); + MergeMessage mergeMessage = entry.getValue(); + + if (mergeMessage instanceof MergedWarpMessage) { + MergedWarpMessage warpMessage = (MergedWarpMessage) mergeMessage; + + BlockingQueue<RpcMessage> basket = basketMap.get(serverAddress); + if (basket != null && !basket.isEmpty()) { + messageIds.addAll(warpMessage.msgIds); + it.remove(); + } + } + } + + return messageIds; + } + + /** + * Cleans up futures for a set of message IDs. + * This completes futures with an exception to notify waiting threads. + * + * @param messageIds the set of message IDs whose futures should be cleaned up + * @param cause the exception to set as the result for each future + */ + private void cleanupFuturesForMessageIds(Set<Integer> messageIds, Exception cause) { + for (Integer messageId : messageIds) { + MessageFuture future = futures.remove(messageId); + if (future != null) { + future.setResultMessage(cause); + } + } + } + + /** + * Fires a channel event without an associated cause. + * This is an overloaded version that calls {@link #fireChannelEvent(Channel, ChannelEventType, Throwable)} + * with a null cause. + * + * @param channel the channel associated with the event + * @param eventType the type of event that occurred + */ + protected void fireChannelEvent(Channel channel, ChannelEventType eventType) { + fireChannelEvent(channel, eventType, null); + } + + /** + * Fires a channel event to all registered listeners. + * This method dispatches the event to the appropriate method on each listener + * based on the event type. + * + * @param channel the channel associated with the event + * @param eventType the type of event that occurred + * @param cause the cause of the event (maybe null for certain event types) + */ + protected void fireChannelEvent(Channel channel, ChannelEventType eventType, Throwable cause) { + for (ChannelEventListener listener : channelEventListeners) { + try { + switch (eventType) { + case CONNECTED: + listener.onChannelConnected(channel); + break; + case DISCONNECTED: + listener.onChannelDisconnected(channel); + break; + case EXCEPTION: + listener.onChannelException(channel, cause); + break; + case IDLE: + listener.onChannelIdle(channel); + break; + default: + break; + } + } catch (Exception e) { + LOGGER.warn("Error while firing channel {} event", eventType, e); + } + } + } + /** * The type Merged send runnable. */ diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventHandler.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventHandler.java new file mode 100644 index 0000000000..ea983330ca --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventHandler.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.IdleStateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handler class for Netty channel events. + * Detects channel activation, deactivation, exceptions, and idle state events + * and forwards these events to the AbstractNettyRemotingClient. + */ +@Sharable +public class ChannelEventHandler extends ChannelDuplexHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(ChannelEventHandler.class); + private final AbstractNettyRemotingClient remotingClient; + + public ChannelEventHandler(AbstractNettyRemotingClient remotingClient) { + this.remotingClient = remotingClient; + } + + /** + * Called when a channel becomes active. + * Logs the channel activation event and notifies the remoting client. + * + * @param ctx the channel handler context + * @throws Exception if an exception occurs + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Channel active: {}", ctx.channel().remoteAddress()); + } + remotingClient.onChannelActive(ctx.channel()); + super.channelActive(ctx); + } + + /** + * Called when a channel becomes inactive. + * Logs the channel deactivation event and notifies the remoting client. + * + * @param ctx the channel handler context + * @throws Exception if an exception occurs + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + LOGGER.warn("Channel inactive: {}", channel.remoteAddress()); + remotingClient.onChannelInactive(channel); + super.channelInactive(ctx); + } + + /** + * Called when an exception occurs in a channel. + * Logs the exception event and notifies the remoting client. + * + * @param ctx the channel handler context + * @param cause the thrown exception + * @throws Exception if an exception occurs + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Channel channel = ctx.channel(); + LOGGER.warn("Channel exception: {}, cause: {}", channel.remoteAddress(), cause.getMessage()); + remotingClient.onChannelException(channel, cause); + super.exceptionCaught(ctx, cause); + } + + /** + * Called when a user event is triggered. + * Primarily handles IdleStateEvent, logs the event and notifies the remoting client. + * + * @param ctx the channel handler context + * @param evt the triggered event + * @throws Exception if an exception occurs + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Channel idle: {}", ctx.channel().remoteAddress()); + } + remotingClient.onChannelIdle(ctx.channel()); + } + super.userEventTriggered(ctx, evt); + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java index 958451a9e4..c654635824 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java @@ -20,39 +20,34 @@ import io.netty.channel.Channel; /** * The interface Channel event listener. - * */ -@Deprecated public interface ChannelEventListener { /** * On channel connect. * - * @param remoteAddr the remote addr * @param channel the channel */ - void onChannelConnect(final String remoteAddr, final Channel channel); + default void onChannelConnected(final Channel channel) {} /** * On channel close. * - * @param remoteAddr the remote addr * @param channel the channel */ - void onChannelClose(final String remoteAddr, final Channel channel); + default void onChannelDisconnected(final Channel channel) {} /** * On channel exception. * - * @param remoteAddr the remote addr * @param channel the channel + * @param cause the cause */ - void onChannelException(final String remoteAddr, final Channel channel); + default void onChannelException(final Channel channel, Throwable cause) {} /** * On channel idle. * - * @param remoteAddr the remote addr * @param channel the channel */ - void onChannelIdle(final String remoteAddr, final Channel channel); + default void onChannelIdle(final Channel channel) {} } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventType.java similarity index 51% copy from core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java copy to core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventType.java index 958451a9e4..c130d9f1d6 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventType.java @@ -16,43 +16,27 @@ */ package org.apache.seata.core.rpc.netty; -import io.netty.channel.Channel; - /** - * The interface Channel event listener. - * + * Enum representing different types of channel events. */ -@Deprecated -public interface ChannelEventListener { +public enum ChannelEventType { /** - * On channel connect. - * - * @param remoteAddr the remote addr - * @param channel the channel + * Channel connected. */ - void onChannelConnect(final String remoteAddr, final Channel channel); + CONNECTED, /** - * On channel close. - * - * @param remoteAddr the remote addr - * @param channel the channel + * Channel disconnected. */ - void onChannelClose(final String remoteAddr, final Channel channel); + DISCONNECTED, /** - * On channel exception. - * - * @param remoteAddr the remote addr - * @param channel the channel + * Channel exception. */ - void onChannelException(final String remoteAddr, final Channel channel); + EXCEPTION, /** - * On channel idle. - * - * @param remoteAddr the remote addr - * @param channel the channel + * Channel idle. */ - void onChannelIdle(final String remoteAddr, final Channel channel); + IDLE } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java index 872cfa2b2b..cccf894fc0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java @@ -86,6 +86,26 @@ public final class RmNettyRemotingClient extends AbstractNettyRemotingClient { getClientChannelManager().initReconnect(transactionServiceGroup, failFast); } } + + registerChannelEventListener(new ChannelEventListener() { + @Override public void onChannelConnected(Channel channel) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Channel active: {}", channel.remoteAddress()); + } + } + + @Override public void onChannelDisconnected(Channel channel) { + LOGGER.warn("Channel inactive: {}", channel.remoteAddress()); + } + + @Override public void onChannelException(Channel channel, Throwable cause) { + LOGGER.error("Channel exception: {}", channel.remoteAddress(), cause); + } + + @Override public void onChannelIdle(Channel channel) { + LOGGER.warn("Channel idle: {}", channel.remoteAddress()); + } + }); } private RmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java index 4873f8c347..9ee68f31ef 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java @@ -84,6 +84,26 @@ public final class TmNettyRemotingClient extends AbstractNettyRemotingClient { } } }); + + registerChannelEventListener(new ChannelEventListener() { + @Override public void onChannelConnected(Channel channel) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Channel active: {}", channel.remoteAddress()); + } + } + + @Override public void onChannelDisconnected(Channel channel) { + LOGGER.warn("Channel inactive: {}", channel.remoteAddress()); + } + + @Override public void onChannelException(Channel channel, Throwable cause) { + LOGGER.error("Channel exception: {}", channel.remoteAddress(), cause); + } + + @Override public void onChannelIdle(Channel channel) { + LOGGER.warn("Channel idle: {}", channel.remoteAddress()); + } + }); } /** diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java new file mode 100644 index 0000000000..ccf848c377 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.GlobalEventExecutor; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Iterator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ChannelEventHandlerIntegrationTest { + + private static final int SERVER_PORT = 8091; + private static final String SERVER_HOST = "127.0.0.1"; + private static final int TIMEOUT_SECONDS = 5; + + private static EventLoopGroup bossGroup; + private static EventLoopGroup workerGroup; + private static Channel serverChannel; + + @Mock + private AbstractNettyRemotingClient mockRemotingClient; + + @Captor + private ArgumentCaptor<Channel> channelCaptor; + + @Captor + private ArgumentCaptor<Throwable> throwableCaptor; + + private ChannelEventHandler channelEventHandler; + private EventLoopGroup clientGroup; + private Channel clientChannel; + private CountDownLatch channelActiveLatch; + private CountDownLatch channelInactiveLatch; + private CountDownLatch exceptionCaughtLatch; + private CountDownLatch idleEventLatch; + + @BeforeAll + static void setupClass() throws InterruptedException { + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new IdleStateHandler(1, 0, 0, TimeUnit.SECONDS)); + } + }); + + serverChannel = serverBootstrap.bind(SERVER_PORT).sync().channel(); + } + + @AfterAll + static void tearDownClass() { + if (serverChannel != null) { + serverChannel.close(); + } + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + } + + @BeforeEach + void setUp() { + channelEventHandler = new ChannelEventHandler(mockRemotingClient); + + clientGroup = new NioEventLoopGroup(); + channelActiveLatch = new CountDownLatch(1); + channelInactiveLatch = new CountDownLatch(1); + exceptionCaughtLatch = new CountDownLatch(1); + idleEventLatch = new CountDownLatch(1); + + lenient() + .doAnswer(invocation -> { + channelActiveLatch.countDown(); + return null; + }) + .when(mockRemotingClient) + .onChannelActive(any(Channel.class)); + + lenient() + .doAnswer(invocation -> { + channelInactiveLatch.countDown(); + return null; + }) + .when(mockRemotingClient) + .onChannelInactive(any(Channel.class)); + + lenient() + .doAnswer(invocation -> { + exceptionCaughtLatch.countDown(); + return null; + }) + .when(mockRemotingClient) + .onChannelException(any(Channel.class), any(Throwable.class)); + + lenient() + .doAnswer(invocation -> { + idleEventLatch.countDown(); + return null; + }) + .when(mockRemotingClient) + .onChannelIdle(any(Channel.class)); + } + + @AfterEach + void tearDown() { + if (clientChannel != null) { + clientChannel.close(); + } + if (clientGroup != null) { + clientGroup.shutdownGracefully(); + } + } + + @Test + void testChannelActive() throws Exception { + connectClient(); + + assertTrue( + channelActiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), + "Channel activation event was not detected"); + + verify(mockRemotingClient).onChannelActive(channelCaptor.capture()); + Channel capturedChannel = channelCaptor.getValue(); + assertNotNull(capturedChannel); + + SocketAddress remoteAddress = capturedChannel.remoteAddress(); + assertInstanceOf(InetSocketAddress.class, remoteAddress); + + InetSocketAddress inetAddress = (InetSocketAddress) remoteAddress; + assertEquals(SERVER_HOST, inetAddress.getHostString()); + assertEquals(SERVER_PORT, inetAddress.getPort()); + } + + @Test + void testChannelInactive() throws Exception { + connectClient(); + + clientChannel.close().sync(); + + assertTrue( + channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), + "Channel deactivation event was not detected"); + + verify(mockRemotingClient).onChannelInactive(any(Channel.class)); + } + + @Test + void testChannelInactiveByServer() throws Exception { + connectClient(); + + DefaultChannelGroup serverChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + serverChannels.addAll(collectServerChannels(workerGroup)); + Channel serverSideClientChannel = serverChannels.stream() + .filter(ch -> ch.isActive() && ch.remoteAddress() != null) + .findFirst() + .orElseThrow(() -> new AssertionError("Failed to find client channel on server side")); + + serverSideClientChannel.close().sync(); + assertTrue( + channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), + "Channel inactive event was not detected on client side when server closed the connection"); + verify(mockRemotingClient).onChannelInactive(any(Channel.class)); + } + + @Test + void testExceptionCaught() throws Exception { + connectClient(); + + RuntimeException testException = new RuntimeException("Test exception"); + clientChannel.pipeline().fireExceptionCaught(testException); + + assertTrue(exceptionCaughtLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), "Exception event was not detected"); + + verify(mockRemotingClient).onChannelException(any(Channel.class), throwableCaptor.capture()); + + Throwable capturedException = throwableCaptor.getValue(); + assertNotNull(capturedException); + } + + @Test + void testChannelIdle() throws Exception { + connectClient(500); + + assertTrue(idleEventLatch.await(3, TimeUnit.SECONDS), "Idle event was not detected"); + + verify(mockRemotingClient).onChannelIdle(any(Channel.class)); + } + + private void connectClient() throws InterruptedException { + connectClient(0); + } + + private void connectClient(int idleTimeoutMillis) throws InterruptedException { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(clientGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + if (idleTimeoutMillis > 0) { + pipeline.addLast(new IdleStateHandler(0, idleTimeoutMillis, 0, TimeUnit.MILLISECONDS)); + } + pipeline.addLast(channelEventHandler); + } + }); + + ChannelFuture future = bootstrap.connect(SERVER_HOST, SERVER_PORT).sync(); + clientChannel = future.channel(); + assertTrue(clientChannel.isActive()); + } + + private DefaultChannelGroup collectServerChannels(EventLoopGroup workerGroup) throws InterruptedException { + DefaultChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + for (EventExecutor executor : workerGroup) { + if (executor instanceof SingleThreadEventLoop) { + SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) executor; + + executor.submit(() -> { + Iterator<Channel> it = eventLoop.registeredChannelsIterator(); + while (it.hasNext()) { + Channel ch = it.next(); + if (ch.isActive() && ch instanceof SocketChannel) { + channels.add(ch); + } + } + return null; + }) + .sync(); + } + } + return channels; + } +} diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventListenerTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventListenerTest.java new file mode 100644 index 0000000000..14534c5cba --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventListenerTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import io.netty.channel.Channel; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class ChannelEventListenerTest { + + private AbstractNettyRemotingClient client; + + @Mock + private Channel channel; + + private TestChannelEventListener testListener; + + @BeforeEach + void setUp() { + client = TmNettyRemotingClient.getInstance(); + testListener = new TestChannelEventListener(); + client.registerChannelEventListener(testListener); + } + + @Test + void testRegisterAndUnregisterListener() { + client.onChannelActive(channel); + + assertTrue(testListener.isConnectedCalled()); + assertEquals(channel, testListener.getChannel()); + + client.unregisterChannelEventListener(testListener); + testListener.reset(); + client.onChannelActive(channel); + + assertFalse(testListener.isConnectedCalled()); + assertNotEquals(channel, testListener.getChannel()); + assertNull(testListener.getChannel()); + } + + @Test + void testChannelConnectedEvent() { + client.onChannelActive(channel); + + assertTrue(testListener.isConnectedCalled()); + assertEquals(channel, testListener.getChannel()); + } + + @Test + void testChannelIdleEvent() { + client.onChannelIdle(channel); + + assertTrue(testListener.isIdleCalled()); + assertEquals(channel, testListener.getChannel()); + } + + @Test + void testChannelDisconnectedEvent() { + AbstractNettyRemotingClient spyClient = spy(client); + spyClient.onChannelInactive(channel); + + assertTrue(testListener.isDisconnectedCalled()); + assertEquals(channel, testListener.getChannel()); + verify(spyClient, times(1)).cleanupResourcesForChannel(channel); + } + + @Test + void testChannelExceptionEvent() { + AbstractNettyRemotingClient spyClient = spy(client); + Exception testException = new RuntimeException("Test exception"); + spyClient.onChannelException(channel, testException); + + assertTrue(testListener.isExceptionCalled()); + assertEquals(channel, testListener.getChannel()); + assertEquals(testException, testListener.getLastException()); + verify(spyClient, times(1)).cleanupResourcesForChannel(channel); + } + + private static class TestChannelEventListener implements ChannelEventListener { + private boolean connectedCalled = false; + private boolean disconnectedCalled = false; + private boolean exceptionCalled = false; + private boolean idleCalled = false; + private Throwable lastException = null; + private Channel channel; + + @Override + public void onChannelConnected(Channel channel) { + this.channel = channel; + this.connectedCalled = true; + } + + @Override + public void onChannelDisconnected(Channel channel) { + this.channel = channel; + this.disconnectedCalled = true; + } + + @Override + public void onChannelException(Channel channel, Throwable cause) { + this.channel = channel; + this.exceptionCalled = true; + this.lastException = cause; + } + + @Override + public void onChannelIdle(Channel channel) { + this.channel = channel; + this.idleCalled = true; + } + + public void reset() { + this.channel = null; + this.connectedCalled = false; + this.disconnectedCalled = false; + this.exceptionCalled = false; + this.idleCalled = false; + this.lastException = null; + } + + public boolean isConnectedCalled() { + return connectedCalled; + } + + public boolean isDisconnectedCalled() { + return disconnectedCalled; + } + + public boolean isExceptionCalled() { + return exceptionCalled; + } + + public boolean isIdleCalled() { + return idleCalled; + } + + public Throwable getLastException() { + return lastException; + } + + public Channel getChannel() { + return channel; + } + } +} diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java new file mode 100644 index 0000000000..34b15847db --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelId; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.seata.core.protocol.MergeMessage; +import org.apache.seata.core.protocol.MergedWarpMessage; +import org.apache.seata.core.protocol.MessageFuture; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ResourceCleanupTest { + + private AbstractNettyRemotingClient client; + + @Mock + private Channel channel; + + @Mock + private ChannelId channelId; + + private Map<Integer, MessageFuture> futures; + private Map<Integer, MergeMessage> mergeMsgMap; + private Map<String, BlockingQueue<RpcMessage>> basketMap; + private Map<String, Channel> channels; + + @BeforeEach + void setUp() throws Exception { + client = TmNettyRemotingClient.getInstance(); + + Field futuresField = AbstractNettyRemoting.class.getDeclaredField("futures"); + futuresField.setAccessible(true); + futures = (Map<Integer, MessageFuture>) futuresField.get(client); + + Field field = AbstractNettyRemotingClient.class.getDeclaredField("mergeMsgMap"); + field.setAccessible(true); + mergeMsgMap = (Map<Integer, MergeMessage>) field.get(client); + + Field basketMapField = AbstractNettyRemotingClient.class.getDeclaredField("basketMap"); + basketMapField.setAccessible(true); + basketMap = (Map<String, BlockingQueue<RpcMessage>>) basketMapField.get(client); + + Field channelManagerField = AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager"); + channelManagerField.setAccessible(true); + NettyClientChannelManager clientChannelManager = (NettyClientChannelManager) channelManagerField.get(client); + + Field channelsField = clientChannelManager.getClass().getDeclaredField("channels"); + channelsField.setAccessible(true); + channels = (Map<String, Channel>) channelsField.get(clientChannelManager); + } + + @Test + void testCleanupMessageFuturesOnChannelDisconnection() { + when(channel.id()).thenReturn(channelId); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress("127.0.0.1", 8091)); + + MessageFuture messageFuture1 = new MessageFuture(); + RpcMessage rpcMessage1 = createRpcMessage(1); + messageFuture1.setRequestMessage(rpcMessage1); + futures.put(1, messageFuture1); + + MessageFuture messageFuture2 = new MessageFuture(); + RpcMessage rpcMessage2 = createRpcMessage(2); + messageFuture2.setRequestMessage(rpcMessage2); + futures.put(2, messageFuture2); + + int parentId = 100; + MergedWarpMessage mergeMessage = new MergedWarpMessage(); + mergeMessage.msgIds = new ArrayList<>(); + mergeMessage.msgIds.add(1); + mergeMessage.msgIds.add(2); + + mergeMsgMap.put(parentId, mergeMessage); + + String serverAddress = "127.0.0.1:8091"; + channels.put(serverAddress, channel); + + BlockingQueue<RpcMessage> basket = new LinkedBlockingQueue<>(); + basket.add(rpcMessage1); + basket.add(rpcMessage2); + basketMap.put(serverAddress, basket); + + client.cleanupResourcesForChannel(channel); + + assertFalse(futures.containsKey(1), "Future ID 1 has not been removed"); + assertFalse(futures.containsKey(2), "Future ID 2 has not been removed"); + + assertThrows(RuntimeException.class, () -> messageFuture1.get(0, java.util.concurrent.TimeUnit.MILLISECONDS)); + assertThrows(RuntimeException.class, () -> messageFuture2.get(0, java.util.concurrent.TimeUnit.MILLISECONDS)); + } + + @Test + void testCleanupWithNullChannel() { + MessageFuture messageFuture = new MessageFuture(); + RpcMessage rpcMessage = createRpcMessage(1); + messageFuture.setRequestMessage(rpcMessage); + futures.put(1, messageFuture); + + assertDoesNotThrow(() -> client.cleanupResourcesForChannel(null)); + assertTrue(futures.containsKey(1), "Future ID 1 should still exist"); + } + + private RpcMessage createRpcMessage(int id) { + RpcMessage message = new RpcMessage(); + message.setId(id); + message.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + message.setBody("test-body-" + id); + return message; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org