This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new c5efbd92d9 optimize: Add `ChannelEventListener` support to prevent 
memory leaks (#7337)
c5efbd92d9 is described below

commit c5efbd92d97b63ef382c17fcab84719ab3b41a77
Author: Yongjun Hong <dev.yongj...@gmail.com>
AuthorDate: Mon May 19 18:38:40 2025 +0900

    optimize: Add `ChannelEventListener` support to prevent memory leaks (#7337)
---
 changes/en-us/2.x.md                               |   1 +
 changes/zh-cn/2.x.md                               |   1 +
 .../org/apache/seata/core/rpc/RemotingClient.java  |  15 ++
 .../rpc/netty/AbstractNettyRemotingClient.java     | 206 ++++++++++++++-
 .../seata/core/rpc/netty/ChannelEventHandler.java  | 107 ++++++++
 .../seata/core/rpc/netty/ChannelEventListener.java |  15 +-
 ...nelEventListener.java => ChannelEventType.java} |  36 +--
 .../core/rpc/netty/RmNettyRemotingClient.java      |  20 ++
 .../core/rpc/netty/TmNettyRemotingClient.java      |  20 ++
 .../netty/ChannelEventHandlerIntegrationTest.java  | 282 +++++++++++++++++++++
 .../core/rpc/netty/ChannelEventListenerTest.java   | 168 ++++++++++++
 .../seata/core/rpc/netty/ResourceCleanupTest.java  | 140 ++++++++++
 12 files changed, 974 insertions(+), 37 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 296eec3bae..e17a54162d 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -35,6 +35,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#7282](https://github.com/apache/incubator-seata/pull/7282)] optimize 
unexpected NullPointerException in lookup method in FileRegistryServiceImpl 
class
 - [[#7310](https://github.com/seata/seata/pull/7310)] Optimize minor issues in 
the naming-server
 - [[#7329](https://github.com/apache/incubator-seata/pull/7329)] upgrade 
tomcat to 9.0.100
+- [[#7337](https://github.com/apache/incubator-seata/pull/7337)] Add 
ChannelEventListener support to prevent memory leaks
 
 
 ### security:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 941ac2d168..98887b0bbe 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -34,6 +34,7 @@
 - [[#7282](https://github.com/apache/incubator-seata/pull/7282)] 
优化FileRegistryServiceImpl类lookup的NullPointerException问题
 - [[#7310](https://github.com/seata/seata/pull/7310)] 优化naming-server中的一些小问题
 - [[#7329](https://github.com/apache/incubator-seata/pull/7329)] 将 tomcat 升级到 
9.0.100
+- [[#7337](https://github.com/apache/incubator-seata/pull/7337)] 添加 
ChannelEventListener 支持以防止内存泄漏
 
 
 ### security:
diff --git a/core/src/main/java/org/apache/seata/core/rpc/RemotingClient.java 
b/core/src/main/java/org/apache/seata/core/rpc/RemotingClient.java
index e9add834a9..1876b2fb11 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/RemotingClient.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/RemotingClient.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeoutException;
 import io.netty.channel.Channel;
 import org.apache.seata.core.protocol.AbstractMessage;
 import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ChannelEventListener;
 import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
 import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
 import org.apache.seata.core.rpc.processor.RemotingProcessor;
@@ -101,4 +102,18 @@ public interface RemotingClient {
      * @param executor    thread pool
      */
     void registerProcessor(final int messageType, final RemotingProcessor 
processor, final ExecutorService executor);
+
+    /**
+     * register channel event listener
+     *
+     * @param channelEventListener {@link ChannelEventListener}
+     */
+    void registerChannelEventListener(ChannelEventListener 
channelEventListener);
+
+    /**
+     * unregister channel event listener
+     *
+     * @param channelEventListener {@link ChannelEventListener}
+     */
+    void unregisterChannelEventListener(ChannelEventListener 
channelEventListener);
 }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
index 8618c3030b..2867cca2b6 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
@@ -16,12 +16,18 @@
  */
 package org.apache.seata.core.rpc.netty;
 
+import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelId;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
@@ -82,6 +88,9 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
     private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;
     private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;
     private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
+
+    private final CopyOnWriteArrayList<ChannelEventListener> 
channelEventListeners = new CopyOnWriteArrayList<>();
+
     protected final Object mergeLock = new Object();
 
     /**
@@ -130,7 +139,7 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
         super(messageExecutor);
         this.transactionRole = transactionRole;
         clientBootstrap = new NettyClientBootstrap(nettyClientConfig, 
eventExecutorGroup, transactionRole);
-        clientBootstrap.setChannelHandlers(new ClientHandler());
+        clientBootstrap.setChannelHandlers(new ClientHandler(), new 
ChannelEventHandler(this));
         clientChannelManager = new NettyClientChannelManager(
             new NettyPoolableFactory(this, clientBootstrap), 
getPoolKeyFunction(), nettyClientConfig);
     }
@@ -336,6 +345,201 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
      */
     protected abstract long getRpcRequestTimeout();
 
+
+    /**
+     * Registers a channel event listener to receive channel events.
+     * If the listener is already registered, it will not be added again.
+     *
+     * @param channelEventListener the channel event listener to register
+     */
+    @Override
+    public void registerChannelEventListener(ChannelEventListener 
channelEventListener) {
+        if (channelEventListener != null) {
+            channelEventListeners.addIfAbsent(channelEventListener);
+            LOGGER.info("register channel event listener: {}", 
channelEventListener.getClass().getName());
+        }
+    }
+
+    /**
+     * Unregisters a previously registered channel event listener.
+     *
+     * @param channelEventListener the channel event listener to unregister
+     */
+    @Override
+    public void unregisterChannelEventListener(ChannelEventListener 
channelEventListener) {
+        if (channelEventListener != null) {
+            channelEventListeners.remove(channelEventListener);
+            LOGGER.info("unregister channel event listener: {}", 
channelEventListener.getClass().getName());
+        }
+    }
+
+    /**
+     * Handles channel active events from Netty.
+     * Fires a CONNECTED event to all registered listeners.
+     *
+     * @param channel the channel that became active
+     */
+    public void onChannelActive(Channel channel) {
+        fireChannelEvent(channel, ChannelEventType.CONNECTED);
+    }
+
+    /**
+     * Handles channel inactive events from Netty.
+     * Fires a DISCONNECTED event to all registered listeners and cleans up 
resources.
+     *
+     * @param channel the channel that became inactive
+     */
+    public void onChannelInactive(Channel channel) {
+        fireChannelEvent(channel, ChannelEventType.DISCONNECTED);
+        cleanupResourcesForChannel(channel);
+    }
+
+    /**
+     * Handles channel exception events from Netty.
+     * Fires an EXCEPTION event to all registered listeners and cleans up 
resources.
+     *
+     * @param channel the channel where the exception occurred
+     * @param cause the throwable that represents the exception
+     */
+    public void onChannelException(Channel channel, Throwable cause) {
+        fireChannelEvent(channel, ChannelEventType.EXCEPTION, cause);
+        cleanupResourcesForChannel(channel);
+    }
+
+    /**
+     * Handles channel idle events from Netty.
+     * Fires an IDLE event to all registered listeners.
+     *
+     * @param channel the channel that became idle
+     */
+    public void onChannelIdle(Channel channel) {
+        fireChannelEvent(channel, ChannelEventType.IDLE);
+    }
+
+    /**
+     * Cleans up resources associated with a channel that has been 
disconnected.
+     * This includes collecting message IDs for the channel and cleaning up 
their futures.
+     *
+     * @param channel the channel for which resources should be cleaned up
+     */
+    protected void cleanupResourcesForChannel(Channel channel) {
+        if (channel == null) {
+            return;
+        }
+        ChannelException cause = new ChannelException(
+            String.format("Channel disconnected: %s", 
channel.remoteAddress()));
+
+        Set<Integer> messageIds = collectMessageIdsForChannel(channel.id());
+        cleanupFuturesForMessageIds(messageIds, cause);
+
+        LOGGER.info("Cleaned up {} pending requests for disconnected channel: 
{}",
+            messageIds.size(), channel.remoteAddress());
+    }
+
+    /**
+     * Collects message IDs associated with a specific channel.
+     * This is used during channel cleanup to identify pending requests.
+     *
+     * @param channelId the ID of the channel
+     * @return a set of message IDs associated with the channel
+     */
+    private Set<Integer> collectMessageIdsForChannel(ChannelId channelId) {
+        Set<Integer> messageIds = new HashSet<>();
+
+        String serverAddress = null;
+        for (Map.Entry<String, Channel> entry : 
clientChannelManager.getChannels().entrySet()) {
+            Channel channel = entry.getValue();
+            if (channelId.equals(channel.id())) {
+                serverAddress = entry.getKey();
+                break;
+            }
+        }
+
+        if (serverAddress == null) {
+            return messageIds;
+        }
+
+        Iterator<Map.Entry<Integer, MergeMessage>> it = 
mergeMsgMap.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<Integer, MergeMessage> entry = it.next();
+            MergeMessage mergeMessage = entry.getValue();
+
+            if (mergeMessage instanceof MergedWarpMessage) {
+                MergedWarpMessage warpMessage = (MergedWarpMessage) 
mergeMessage;
+
+                BlockingQueue<RpcMessage> basket = 
basketMap.get(serverAddress);
+                if (basket != null && !basket.isEmpty()) {
+                    messageIds.addAll(warpMessage.msgIds);
+                    it.remove();
+                }
+            }
+        }
+
+        return messageIds;
+    }
+
+    /**
+     * Cleans up futures for a set of message IDs.
+     * This completes futures with an exception to notify waiting threads.
+     *
+     * @param messageIds the set of message IDs whose futures should be 
cleaned up
+     * @param cause the exception to set as the result for each future
+     */
+    private void cleanupFuturesForMessageIds(Set<Integer> messageIds, 
Exception cause) {
+        for (Integer messageId : messageIds) {
+            MessageFuture future = futures.remove(messageId);
+            if (future != null) {
+                future.setResultMessage(cause);
+            }
+        }
+    }
+
+    /**
+     * Fires a channel event without an associated cause.
+     * This is an overloaded version that calls {@link 
#fireChannelEvent(Channel, ChannelEventType, Throwable)}
+     * with a null cause.
+     *
+     * @param channel the channel associated with the event
+     * @param eventType the type of event that occurred
+     */
+    protected void fireChannelEvent(Channel channel, ChannelEventType 
eventType) {
+        fireChannelEvent(channel, eventType, null);
+    }
+
+    /**
+     * Fires a channel event to all registered listeners.
+     * This method dispatches the event to the appropriate method on each 
listener
+     * based on the event type.
+     *
+     * @param channel the channel associated with the event
+     * @param eventType the type of event that occurred
+     * @param cause the cause of the event (maybe null for certain event types)
+     */
+    protected void fireChannelEvent(Channel channel, ChannelEventType 
eventType, Throwable cause) {
+        for (ChannelEventListener listener : channelEventListeners) {
+            try {
+                switch (eventType) {
+                    case CONNECTED:
+                        listener.onChannelConnected(channel);
+                        break;
+                    case DISCONNECTED:
+                        listener.onChannelDisconnected(channel);
+                        break;
+                    case EXCEPTION:
+                        listener.onChannelException(channel, cause);
+                        break;
+                    case IDLE:
+                        listener.onChannelIdle(channel);
+                        break;
+                    default:
+                        break;
+                }
+            } catch (Exception e) {
+                LOGGER.warn("Error while firing channel {} event", eventType, 
e);
+            }
+        }
+    }
+
     /**
      * The type Merged send runnable.
      */
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventHandler.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventHandler.java
new file mode 100644
index 0000000000..ea983330ca
--- /dev/null
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handler class for Netty channel events.
+ * Detects channel activation, deactivation, exceptions, and idle state events
+ * and forwards these events to the AbstractNettyRemotingClient.
+ */
+@Sharable
+public class ChannelEventHandler extends ChannelDuplexHandler {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ChannelEventHandler.class);
+    private final AbstractNettyRemotingClient remotingClient;
+
+    public ChannelEventHandler(AbstractNettyRemotingClient remotingClient) {
+        this.remotingClient = remotingClient;
+    }
+
+    /**
+     * Called when a channel becomes active.
+     * Logs the channel activation event and notifies the remoting client.
+     *
+     * @param ctx the channel handler context
+     * @throws Exception if an exception occurs
+     */
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Channel active: {}", ctx.channel().remoteAddress());
+        }
+        remotingClient.onChannelActive(ctx.channel());
+        super.channelActive(ctx);
+    }
+
+    /**
+     * Called when a channel becomes inactive.
+     * Logs the channel deactivation event and notifies the remoting client.
+     *
+     * @param ctx the channel handler context
+     * @throws Exception if an exception occurs
+     */
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        Channel channel = ctx.channel();
+        LOGGER.warn("Channel inactive: {}", channel.remoteAddress());
+        remotingClient.onChannelInactive(channel);
+        super.channelInactive(ctx);
+    }
+
+    /**
+     * Called when an exception occurs in a channel.
+     * Logs the exception event and notifies the remoting client.
+     *
+     * @param ctx the channel handler context
+     * @param cause the thrown exception
+     * @throws Exception if an exception occurs
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
+        Channel channel = ctx.channel();
+        LOGGER.warn("Channel exception: {}, cause: {}", 
channel.remoteAddress(), cause.getMessage());
+        remotingClient.onChannelException(channel, cause);
+        super.exceptionCaught(ctx, cause);
+    }
+
+    /**
+     * Called when a user event is triggered.
+     * Primarily handles IdleStateEvent, logs the event and notifies the 
remoting client.
+     *
+     * @param ctx the channel handler context
+     * @param evt the triggered event
+     * @throws Exception if an exception occurs
+     */
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Channel idle: {}", 
ctx.channel().remoteAddress());
+            }
+            remotingClient.onChannelIdle(ctx.channel());
+        }
+        super.userEventTriggered(ctx, evt);
+    }
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java
index 958451a9e4..c654635824 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java
@@ -20,39 +20,34 @@ import io.netty.channel.Channel;
 
 /**
  * The interface Channel event listener.
- *
  */
-@Deprecated
 public interface ChannelEventListener {
     /**
      * On channel connect.
      *
-     * @param remoteAddr the remote addr
      * @param channel    the channel
      */
-    void onChannelConnect(final String remoteAddr, final Channel channel);
+    default void onChannelConnected(final Channel channel) {}
 
     /**
      * On channel close.
      *
-     * @param remoteAddr the remote addr
      * @param channel    the channel
      */
-    void onChannelClose(final String remoteAddr, final Channel channel);
+    default void onChannelDisconnected(final Channel channel) {}
 
     /**
      * On channel exception.
      *
-     * @param remoteAddr the remote addr
      * @param channel    the channel
+     * @param cause      the cause
      */
-    void onChannelException(final String remoteAddr, final Channel channel);
+    default void onChannelException(final Channel channel, Throwable cause) {}
 
     /**
      * On channel idle.
      *
-     * @param remoteAddr the remote addr
      * @param channel    the channel
      */
-    void onChannelIdle(final String remoteAddr, final Channel channel);
+    default void onChannelIdle(final Channel channel) {}
 }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventType.java
similarity index 51%
copy from 
core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java
copy to core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventType.java
index 958451a9e4..c130d9f1d6 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventListener.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelEventType.java
@@ -16,43 +16,27 @@
  */
 package org.apache.seata.core.rpc.netty;
 
-import io.netty.channel.Channel;
-
 /**
- * The interface Channel event listener.
- *
+ * Enum representing different types of channel events.
  */
-@Deprecated
-public interface ChannelEventListener {
+public enum ChannelEventType {
     /**
-     * On channel connect.
-     *
-     * @param remoteAddr the remote addr
-     * @param channel    the channel
+     * Channel connected.
      */
-    void onChannelConnect(final String remoteAddr, final Channel channel);
+    CONNECTED,
 
     /**
-     * On channel close.
-     *
-     * @param remoteAddr the remote addr
-     * @param channel    the channel
+     * Channel disconnected.
      */
-    void onChannelClose(final String remoteAddr, final Channel channel);
+    DISCONNECTED,
 
     /**
-     * On channel exception.
-     *
-     * @param remoteAddr the remote addr
-     * @param channel    the channel
+     * Channel exception.
      */
-    void onChannelException(final String remoteAddr, final Channel channel);
+    EXCEPTION,
 
     /**
-     * On channel idle.
-     *
-     * @param remoteAddr the remote addr
-     * @param channel    the channel
+     * Channel idle.
      */
-    void onChannelIdle(final String remoteAddr, final Channel channel);
+    IDLE
 }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java
index 872cfa2b2b..cccf894fc0 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java
@@ -86,6 +86,26 @@ public final class RmNettyRemotingClient extends 
AbstractNettyRemotingClient {
                 
getClientChannelManager().initReconnect(transactionServiceGroup, failFast);
             }
         }
+
+        registerChannelEventListener(new ChannelEventListener() {
+            @Override public void onChannelConnected(Channel channel) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Channel active: {}", 
channel.remoteAddress());
+                }
+            }
+
+            @Override public void onChannelDisconnected(Channel channel) {
+                LOGGER.warn("Channel inactive: {}", channel.remoteAddress());
+            }
+
+            @Override public void onChannelException(Channel channel, 
Throwable cause) {
+                LOGGER.error("Channel exception: {}", channel.remoteAddress(), 
cause);
+            }
+
+            @Override public void onChannelIdle(Channel channel) {
+                LOGGER.warn("Channel idle: {}", channel.remoteAddress());
+            }
+        });
     }
 
     private RmNettyRemotingClient(NettyClientConfig nettyClientConfig, 
EventExecutorGroup eventExecutorGroup,
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java
index 4873f8c347..9ee68f31ef 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java
@@ -84,6 +84,26 @@ public final class TmNettyRemotingClient extends 
AbstractNettyRemotingClient {
                 }
             }
         });
+
+        registerChannelEventListener(new ChannelEventListener() {
+            @Override public void onChannelConnected(Channel channel) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("Channel active: {}", 
channel.remoteAddress());
+                }
+            }
+
+            @Override public void onChannelDisconnected(Channel channel) {
+                LOGGER.warn("Channel inactive: {}", channel.remoteAddress());
+            }
+
+            @Override public void onChannelException(Channel channel, 
Throwable cause) {
+                LOGGER.error("Channel exception: {}", channel.remoteAddress(), 
cause);
+            }
+
+            @Override public void onChannelIdle(Channel channel) {
+                LOGGER.warn("Channel idle: {}", channel.remoteAddress());
+            }
+        });
     }
 
     /**
diff --git 
a/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java
 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java
new file mode 100644
index 0000000000..ccf848c377
--- /dev/null
+++ 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SingleThreadEventLoop;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class ChannelEventHandlerIntegrationTest {
+
+    private static final int SERVER_PORT = 8091;
+    private static final String SERVER_HOST = "127.0.0.1";
+    private static final int TIMEOUT_SECONDS = 5;
+
+    private static EventLoopGroup bossGroup;
+    private static EventLoopGroup workerGroup;
+    private static Channel serverChannel;
+
+    @Mock
+    private AbstractNettyRemotingClient mockRemotingClient;
+
+    @Captor
+    private ArgumentCaptor<Channel> channelCaptor;
+
+    @Captor
+    private ArgumentCaptor<Throwable> throwableCaptor;
+
+    private ChannelEventHandler channelEventHandler;
+    private EventLoopGroup clientGroup;
+    private Channel clientChannel;
+    private CountDownLatch channelActiveLatch;
+    private CountDownLatch channelInactiveLatch;
+    private CountDownLatch exceptionCaughtLatch;
+    private CountDownLatch idleEventLatch;
+
+    @BeforeAll
+    static void setupClass() throws InterruptedException {
+        bossGroup = new NioEventLoopGroup(1);
+        workerGroup = new NioEventLoopGroup();
+
+        ServerBootstrap serverBootstrap = new ServerBootstrap();
+        serverBootstrap
+                .group(bossGroup, workerGroup)
+                .channel(NioServerSocketChannel.class)
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(SocketChannel ch) {
+                        ch.pipeline().addLast(new IdleStateHandler(1, 0, 0, 
TimeUnit.SECONDS));
+                    }
+                });
+
+        serverChannel = serverBootstrap.bind(SERVER_PORT).sync().channel();
+    }
+
+    @AfterAll
+    static void tearDownClass() {
+        if (serverChannel != null) {
+            serverChannel.close();
+        }
+        if (bossGroup != null) {
+            bossGroup.shutdownGracefully();
+        }
+        if (workerGroup != null) {
+            workerGroup.shutdownGracefully();
+        }
+    }
+
+    @BeforeEach
+    void setUp() {
+        channelEventHandler = new ChannelEventHandler(mockRemotingClient);
+
+        clientGroup = new NioEventLoopGroup();
+        channelActiveLatch = new CountDownLatch(1);
+        channelInactiveLatch = new CountDownLatch(1);
+        exceptionCaughtLatch = new CountDownLatch(1);
+        idleEventLatch = new CountDownLatch(1);
+
+        lenient()
+                .doAnswer(invocation -> {
+                    channelActiveLatch.countDown();
+                    return null;
+                })
+                .when(mockRemotingClient)
+                .onChannelActive(any(Channel.class));
+
+        lenient()
+                .doAnswer(invocation -> {
+                    channelInactiveLatch.countDown();
+                    return null;
+                })
+                .when(mockRemotingClient)
+                .onChannelInactive(any(Channel.class));
+
+        lenient()
+                .doAnswer(invocation -> {
+                    exceptionCaughtLatch.countDown();
+                    return null;
+                })
+                .when(mockRemotingClient)
+                .onChannelException(any(Channel.class), any(Throwable.class));
+
+        lenient()
+                .doAnswer(invocation -> {
+                    idleEventLatch.countDown();
+                    return null;
+                })
+                .when(mockRemotingClient)
+                .onChannelIdle(any(Channel.class));
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (clientChannel != null) {
+            clientChannel.close();
+        }
+        if (clientGroup != null) {
+            clientGroup.shutdownGracefully();
+        }
+    }
+
+    @Test
+    void testChannelActive() throws Exception {
+        connectClient();
+
+        assertTrue(
+                channelActiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
+                "Channel activation event was not detected");
+
+        verify(mockRemotingClient).onChannelActive(channelCaptor.capture());
+        Channel capturedChannel = channelCaptor.getValue();
+        assertNotNull(capturedChannel);
+
+        SocketAddress remoteAddress = capturedChannel.remoteAddress();
+        assertInstanceOf(InetSocketAddress.class, remoteAddress);
+
+        InetSocketAddress inetAddress = (InetSocketAddress) remoteAddress;
+        assertEquals(SERVER_HOST, inetAddress.getHostString());
+        assertEquals(SERVER_PORT, inetAddress.getPort());
+    }
+
+    @Test
+    void testChannelInactive() throws Exception {
+        connectClient();
+
+        clientChannel.close().sync();
+
+        assertTrue(
+                channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
+                "Channel deactivation event was not detected");
+
+        verify(mockRemotingClient).onChannelInactive(any(Channel.class));
+    }
+
+    @Test
+    void testChannelInactiveByServer() throws Exception {
+        connectClient();
+
+        DefaultChannelGroup serverChannels = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+        serverChannels.addAll(collectServerChannels(workerGroup));
+        Channel serverSideClientChannel = serverChannels.stream()
+                .filter(ch -> ch.isActive() && ch.remoteAddress() != null)
+                .findFirst()
+                .orElseThrow(() -> new AssertionError("Failed to find client 
channel on server side"));
+
+        serverSideClientChannel.close().sync();
+        assertTrue(
+                channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
+                "Channel inactive event was not detected on client side when 
server closed the connection");
+        verify(mockRemotingClient).onChannelInactive(any(Channel.class));
+    }
+
+    @Test
+    void testExceptionCaught() throws Exception {
+        connectClient();
+
+        RuntimeException testException = new RuntimeException("Test 
exception");
+        clientChannel.pipeline().fireExceptionCaught(testException);
+
+        assertTrue(exceptionCaughtLatch.await(TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Exception event was not detected");
+
+        verify(mockRemotingClient).onChannelException(any(Channel.class), 
throwableCaptor.capture());
+
+        Throwable capturedException = throwableCaptor.getValue();
+        assertNotNull(capturedException);
+    }
+
+    @Test
+    void testChannelIdle() throws Exception {
+        connectClient(500);
+
+        assertTrue(idleEventLatch.await(3, TimeUnit.SECONDS), "Idle event was 
not detected");
+
+        verify(mockRemotingClient).onChannelIdle(any(Channel.class));
+    }
+
+    private void connectClient() throws InterruptedException {
+        connectClient(0);
+    }
+
+    private void connectClient(int idleTimeoutMillis) throws 
InterruptedException {
+        Bootstrap bootstrap = new Bootstrap();
+        
bootstrap.group(clientGroup).channel(NioSocketChannel.class).handler(new 
ChannelInitializer<SocketChannel>() {
+            @Override
+            protected void initChannel(SocketChannel ch) {
+                ChannelPipeline pipeline = ch.pipeline();
+                if (idleTimeoutMillis > 0) {
+                    pipeline.addLast(new IdleStateHandler(0, 
idleTimeoutMillis, 0, TimeUnit.MILLISECONDS));
+                }
+                pipeline.addLast(channelEventHandler);
+            }
+        });
+
+        ChannelFuture future = bootstrap.connect(SERVER_HOST, 
SERVER_PORT).sync();
+        clientChannel = future.channel();
+        assertTrue(clientChannel.isActive());
+    }
+
+    private DefaultChannelGroup collectServerChannels(EventLoopGroup 
workerGroup) throws InterruptedException {
+        DefaultChannelGroup channels = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+        for (EventExecutor executor : workerGroup) {
+            if (executor instanceof SingleThreadEventLoop) {
+                SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) 
executor;
+
+                executor.submit(() -> {
+                            Iterator<Channel> it = 
eventLoop.registeredChannelsIterator();
+                            while (it.hasNext()) {
+                                Channel ch = it.next();
+                                if (ch.isActive() && ch instanceof 
SocketChannel) {
+                                    channels.add(ch);
+                                }
+                            }
+                            return null;
+                        })
+                        .sync();
+            }
+        }
+        return channels;
+    }
+}
diff --git 
a/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventListenerTest.java
 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventListenerTest.java
new file mode 100644
index 0000000000..14534c5cba
--- /dev/null
+++ 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventListenerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import io.netty.channel.Channel;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+class ChannelEventListenerTest {
+
+    private AbstractNettyRemotingClient client;
+
+    @Mock
+    private Channel channel;
+
+    private TestChannelEventListener testListener;
+
+    @BeforeEach
+    void setUp() {
+        client = TmNettyRemotingClient.getInstance();
+        testListener = new TestChannelEventListener();
+        client.registerChannelEventListener(testListener);
+    }
+
+    @Test
+    void testRegisterAndUnregisterListener() {
+        client.onChannelActive(channel);
+
+        assertTrue(testListener.isConnectedCalled());
+        assertEquals(channel, testListener.getChannel());
+
+        client.unregisterChannelEventListener(testListener);
+        testListener.reset();
+        client.onChannelActive(channel);
+
+        assertFalse(testListener.isConnectedCalled());
+        assertNotEquals(channel, testListener.getChannel());
+        assertNull(testListener.getChannel());
+    }
+
+    @Test
+    void testChannelConnectedEvent() {
+        client.onChannelActive(channel);
+
+        assertTrue(testListener.isConnectedCalled());
+        assertEquals(channel, testListener.getChannel());
+    }
+
+    @Test
+    void testChannelIdleEvent() {
+        client.onChannelIdle(channel);
+
+        assertTrue(testListener.isIdleCalled());
+        assertEquals(channel, testListener.getChannel());
+    }
+
+    @Test
+    void testChannelDisconnectedEvent() {
+        AbstractNettyRemotingClient spyClient = spy(client);
+        spyClient.onChannelInactive(channel);
+
+        assertTrue(testListener.isDisconnectedCalled());
+        assertEquals(channel, testListener.getChannel());
+        verify(spyClient, times(1)).cleanupResourcesForChannel(channel);
+    }
+
+    @Test
+    void testChannelExceptionEvent() {
+        AbstractNettyRemotingClient spyClient = spy(client);
+        Exception testException = new RuntimeException("Test exception");
+        spyClient.onChannelException(channel, testException);
+
+        assertTrue(testListener.isExceptionCalled());
+        assertEquals(channel, testListener.getChannel());
+        assertEquals(testException, testListener.getLastException());
+        verify(spyClient, times(1)).cleanupResourcesForChannel(channel);
+    }
+
+    private static class TestChannelEventListener implements 
ChannelEventListener {
+        private boolean connectedCalled = false;
+        private boolean disconnectedCalled = false;
+        private boolean exceptionCalled = false;
+        private boolean idleCalled = false;
+        private Throwable lastException = null;
+        private Channel channel;
+
+        @Override
+        public void onChannelConnected(Channel channel) {
+            this.channel = channel;
+            this.connectedCalled = true;
+        }
+
+        @Override
+        public void onChannelDisconnected(Channel channel) {
+            this.channel = channel;
+            this.disconnectedCalled = true;
+        }
+
+        @Override
+        public void onChannelException(Channel channel, Throwable cause) {
+            this.channel = channel;
+            this.exceptionCalled = true;
+            this.lastException = cause;
+        }
+
+        @Override
+        public void onChannelIdle(Channel channel) {
+            this.channel = channel;
+            this.idleCalled = true;
+        }
+
+        public void reset() {
+            this.channel = null;
+            this.connectedCalled = false;
+            this.disconnectedCalled = false;
+            this.exceptionCalled = false;
+            this.idleCalled = false;
+            this.lastException = null;
+        }
+
+        public boolean isConnectedCalled() {
+            return connectedCalled;
+        }
+
+        public boolean isDisconnectedCalled() {
+            return disconnectedCalled;
+        }
+
+        public boolean isExceptionCalled() {
+            return exceptionCalled;
+        }
+
+        public boolean isIdleCalled() {
+            return idleCalled;
+        }
+
+        public Throwable getLastException() {
+            return lastException;
+        }
+
+        public Channel getChannel() {
+            return channel;
+        }
+    }
+}
diff --git 
a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
new file mode 100644
index 0000000000..34b15847db
--- /dev/null
+++ 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.seata.core.protocol.MergeMessage;
+import org.apache.seata.core.protocol.MergedWarpMessage;
+import org.apache.seata.core.protocol.MessageFuture;
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class ResourceCleanupTest {
+
+    private AbstractNettyRemotingClient client;
+
+    @Mock
+    private Channel channel;
+
+    @Mock
+    private ChannelId channelId;
+
+    private Map<Integer, MessageFuture> futures;
+    private Map<Integer, MergeMessage> mergeMsgMap;
+    private Map<String, BlockingQueue<RpcMessage>> basketMap;
+    private Map<String, Channel> channels;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        client = TmNettyRemotingClient.getInstance();
+
+        Field futuresField = 
AbstractNettyRemoting.class.getDeclaredField("futures");
+        futuresField.setAccessible(true);
+        futures = (Map<Integer, MessageFuture>) futuresField.get(client);
+
+        Field field = 
AbstractNettyRemotingClient.class.getDeclaredField("mergeMsgMap");
+        field.setAccessible(true);
+        mergeMsgMap = (Map<Integer, MergeMessage>) field.get(client);
+
+        Field basketMapField = 
AbstractNettyRemotingClient.class.getDeclaredField("basketMap");
+        basketMapField.setAccessible(true);
+        basketMap = (Map<String, BlockingQueue<RpcMessage>>) 
basketMapField.get(client);
+
+        Field channelManagerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+        channelManagerField.setAccessible(true);
+        NettyClientChannelManager clientChannelManager = 
(NettyClientChannelManager) channelManagerField.get(client);
+
+        Field channelsField = 
clientChannelManager.getClass().getDeclaredField("channels");
+        channelsField.setAccessible(true);
+        channels = (Map<String, Channel>) 
channelsField.get(clientChannelManager);
+    }
+
+    @Test
+    void testCleanupMessageFuturesOnChannelDisconnection() {
+        when(channel.id()).thenReturn(channelId);
+        when(channel.remoteAddress()).thenReturn(new 
InetSocketAddress("127.0.0.1", 8091));
+
+        MessageFuture messageFuture1 = new MessageFuture();
+        RpcMessage rpcMessage1 = createRpcMessage(1);
+        messageFuture1.setRequestMessage(rpcMessage1);
+        futures.put(1, messageFuture1);
+
+        MessageFuture messageFuture2 = new MessageFuture();
+        RpcMessage rpcMessage2 = createRpcMessage(2);
+        messageFuture2.setRequestMessage(rpcMessage2);
+        futures.put(2, messageFuture2);
+
+        int parentId = 100;
+        MergedWarpMessage mergeMessage = new MergedWarpMessage();
+        mergeMessage.msgIds = new ArrayList<>();
+        mergeMessage.msgIds.add(1);
+        mergeMessage.msgIds.add(2);
+
+        mergeMsgMap.put(parentId, mergeMessage);
+
+        String serverAddress = "127.0.0.1:8091";
+        channels.put(serverAddress, channel);
+
+        BlockingQueue<RpcMessage> basket = new LinkedBlockingQueue<>();
+        basket.add(rpcMessage1);
+        basket.add(rpcMessage2);
+        basketMap.put(serverAddress, basket);
+
+        client.cleanupResourcesForChannel(channel);
+
+        assertFalse(futures.containsKey(1), "Future ID 1 has not been 
removed");
+        assertFalse(futures.containsKey(2), "Future ID 2 has not been 
removed");
+
+        assertThrows(RuntimeException.class, () -> messageFuture1.get(0, 
java.util.concurrent.TimeUnit.MILLISECONDS));
+        assertThrows(RuntimeException.class, () -> messageFuture2.get(0, 
java.util.concurrent.TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    void testCleanupWithNullChannel() {
+        MessageFuture messageFuture = new MessageFuture();
+        RpcMessage rpcMessage = createRpcMessage(1);
+        messageFuture.setRequestMessage(rpcMessage);
+        futures.put(1, messageFuture);
+
+        assertDoesNotThrow(() -> client.cleanupResourcesForChannel(null));
+        assertTrue(futures.containsKey(1), "Future ID 1 should still exist");
+    }
+
+    private RpcMessage createRpcMessage(int id) {
+        RpcMessage message = new RpcMessage();
+        message.setId(id);
+        message.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
+        message.setBody("test-body-" + id);
+        return message;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org


Reply via email to