This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 5cbc488623 optimize: update resource cleanup logic for channel 
disconnection (#7360)
5cbc488623 is described below

commit 5cbc48862364b8959dae4c14531a64afff2626d7
Author: Yongjun Hong <yongj...@apache.org>
AuthorDate: Sat May 24 22:14:54 2025 +0900

    optimize: update resource cleanup logic for channel disconnection (#7360)
---
 changes/en-us/2.x.md                               |   1 +
 changes/zh-cn/2.x.md                               |   2 +-
 .../rpc/netty/AbstractNettyRemotingClient.java     | 146 +++++++++++++--------
 .../netty/ChannelEventHandlerIntegrationTest.java  | 100 +++++++-------
 .../seata/core/rpc/netty/ResourceCleanupTest.java  |  12 ++
 5 files changed, 154 insertions(+), 107 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index cf0360be89..a8fef0c576 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -40,6 +40,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#7344](https://github.com/apache/incubator-seata/pull/7344)] raft mode 
performs transaction size check in advance
 - [[#7337](https://github.com/apache/incubator-seata/pull/7337)] Add 
ChannelEventListener support to prevent memory leaks
 - [[#7350](https://github.com/apache/incubator-seata/pull/7350)] optimize 
codecov.yml
+- [[#7360](https://github.com/apache/incubator-seata/pull/7360)] Update 
resource cleanup logic for channel disconnection
 
 
 ### security:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 2cafefb523..dc139bd5be 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -38,7 +38,7 @@
 - [[#7344](https://github.com/apache/incubator-seata/pull/7344)] raft模式提前检查事务大小
 - [[#7337](https://github.com/apache/incubator-seata/pull/7337)] 添加 
ChannelEventListener 支持以防止内存泄漏
 - [[#7350](https://github.com/apache/incubator-seata/pull/7350)] 优化单测覆盖配置
-
+- [[#7360](https://github.com/apache/incubator-seata/pull/7360)] 
更新通道断开连接时的资源清理逻辑
 
 ### security:
 
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
index 2867cca2b6..383b6da509 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
@@ -16,8 +16,18 @@
  */
 package org.apache.seata.core.rpc.netty;
 
+import static 
org.apache.seata.common.exception.FrameworkErrorCode.NoAvailableService;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelException;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.concurrent.EventExecutorGroup;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.HashSet;
@@ -35,14 +45,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.util.concurrent.EventExecutorGroup;
 import org.apache.seata.common.exception.FrameworkErrorCode;
 import org.apache.seata.common.exception.FrameworkException;
 import org.apache.seata.common.thread.NamedThreadFactory;
@@ -69,11 +71,8 @@ import org.apache.seata.discovery.registry.RegistryFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.seata.common.exception.FrameworkErrorCode.NoAvailableService;
-
 /**
  * The netty remoting client.
- *
  */
 public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting implements RemotingClient {
 
@@ -105,7 +104,9 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
      * Send via asynchronous thread {@link 
AbstractNettyRemotingClient.MergedSendRunnable}
      * {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()}
      */
-    protected final ConcurrentHashMap<String/*serverAddress*/, 
BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
+    protected final ConcurrentHashMap<String /*serverAddress*/, 
BlockingQueue<RpcMessage>> basketMap =
+        new ConcurrentHashMap<>();
+
     private final NettyClientBootstrap clientBootstrap;
     private final NettyClientChannelManager clientChannelManager;
     private final NettyPoolKey.TransactionRole transactionRole;
@@ -115,17 +116,23 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
 
     @Override
     public void init() {
-        timerExecutor.scheduleAtFixedRate(() -> {
-            try {
-                clientChannelManager.reconnect(getTransactionServiceGroup());
-            } catch (Exception ex) {
-                LOGGER.warn("reconnect server failed. {}", ex.getMessage());
-            }
-        }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, 
TimeUnit.MILLISECONDS);
+        timerExecutor.scheduleAtFixedRate(
+            () -> {
+                try {
+                    
clientChannelManager.reconnect(getTransactionServiceGroup());
+                } catch (Exception ex) {
+                    LOGGER.warn("reconnect server failed. {}", 
ex.getMessage());
+                }
+            },
+            SCHEDULE_DELAY_MILLS,
+            SCHEDULE_INTERVAL_MILLS,
+            TimeUnit.MILLISECONDS);
         if (this.isEnableClientBatchSendRequest()) {
-            mergeSendExecutorService = new 
ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
+            mergeSendExecutorService = new ThreadPoolExecutor(
+                MAX_MERGE_SEND_THREAD,
                 MAX_MERGE_SEND_THREAD,
-                KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
+                KEEP_ALIVE_TIME,
+                TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(),
                 new NamedThreadFactory(getThreadPrefix(), 
MAX_MERGE_SEND_THREAD));
             mergeSendExecutorService.submit(new MergedSendRunnable());
@@ -134,8 +141,11 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
         clientBootstrap.start();
     }
 
-    public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, 
EventExecutorGroup eventExecutorGroup,
-                                       ThreadPoolExecutor messageExecutor, 
NettyPoolKey.TransactionRole transactionRole) {
+    public AbstractNettyRemotingClient(
+        NettyClientConfig nettyClientConfig,
+        EventExecutorGroup eventExecutorGroup,
+        ThreadPoolExecutor messageExecutor,
+        NettyPoolKey.TransactionRole transactionRole) {
         super(messageExecutor);
         this.transactionRole = transactionRole;
         clientBootstrap = new NettyClientBootstrap(nettyClientConfig, 
eventExecutorGroup, transactionRole);
@@ -161,11 +171,13 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
             futures.put(rpcMessage.getId(), messageFuture);
 
             // put message into basketMap
-            BlockingQueue<RpcMessage> basket = 
CollectionUtils.computeIfAbsent(basketMap, serverAddress,
-                key -> new LinkedBlockingQueue<>());
+            BlockingQueue<RpcMessage> basket =
+                CollectionUtils.computeIfAbsent(basketMap, serverAddress, key 
-> new LinkedBlockingQueue<>());
             if (!basket.offer(rpcMessage)) {
-                LOGGER.error("put message into basketMap offer failed, 
serverAddress:{},rpcMessage:{}",
-                    serverAddress, rpcMessage);
+                LOGGER.error(
+                    "put message into basketMap offer failed, 
serverAddress:{},rpcMessage:{}",
+                    serverAddress,
+                    rpcMessage);
                 return null;
             }
             if (LOGGER.isDebugEnabled()) {
@@ -181,9 +193,13 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
                 Object response = messageFuture.get(timeoutMillis, 
TimeUnit.MILLISECONDS);
                 return response;
             } catch (Exception exx) {
-                LOGGER.error("wait response error:{},ip:{},request:{}", 
exx.getMessage(), serverAddress, rpcMessage.getBody());
+                LOGGER.error(
+                    "wait response error:{},ip:{},request:{}",
+                    exx.getMessage(),
+                    serverAddress,
+                    rpcMessage.getBody());
                 if (exx instanceof TimeoutException) {
-                    throw (TimeoutException)exx;
+                    throw (TimeoutException) exx;
                 } else {
                     throw new RuntimeException(exx);
                 }
@@ -192,7 +208,6 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
             Channel channel = 
clientChannelManager.acquireChannel(serverAddress);
             return super.sendSync(channel, rpcMessage, timeoutMillis);
         }
-
     }
 
     @Override
@@ -209,17 +224,20 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
     public void sendAsyncRequest(Channel channel, Object msg) {
         if (channel == null) {
             LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
-            throw new FrameworkException(new Throwable("throw"), 
"frameworkException", FrameworkErrorCode.ChannelIsNotWritable);
+            throw new FrameworkException(
+                new Throwable("throw"), "frameworkException", 
FrameworkErrorCode.ChannelIsNotWritable);
         }
-        RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof 
HeartbeatMessage
-            ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
-            : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
+        RpcMessage rpcMessage = buildRequestMessage(
+            msg,
+            msg instanceof HeartbeatMessage
+                ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
+                : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
         Object body = rpcMessage.getBody();
         if (body instanceof MergeMessage) {
             Integer parentId = rpcMessage.getId();
-            mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody());
+            mergeMsgMap.put(parentId, (MergeMessage) rpcMessage.getBody());
             if (body instanceof MergedWarpMessage) {
-                for (Integer msgId : 
((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
+                for (Integer msgId : ((MergedWarpMessage) 
rpcMessage.getBody()).msgIds) {
                     childToParentMap.put(msgId, parentId);
                 }
             }
@@ -310,7 +328,9 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
             } catch (Exception ignore) {
             }
         }
-        return StringUtils.isBlank(xid) ? 
String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid;
+        return StringUtils.isBlank(xid)
+            ? 
String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE))
+            : xid;
     }
 
     private String getThreadPrefix() {
@@ -345,7 +365,6 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
      */
     protected abstract long getRpcRequestTimeout();
 
-
     /**
      * Registers a channel event listener to receive channel events.
      * If the listener is already registered, it will not be added again.
@@ -356,7 +375,9 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
     public void registerChannelEventListener(ChannelEventListener 
channelEventListener) {
         if (channelEventListener != null) {
             channelEventListeners.addIfAbsent(channelEventListener);
-            LOGGER.info("register channel event listener: {}", 
channelEventListener.getClass().getName());
+            LOGGER.info(
+                "register channel event listener: {}",
+                channelEventListener.getClass().getName());
         }
     }
 
@@ -369,7 +390,9 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
     public void unregisterChannelEventListener(ChannelEventListener 
channelEventListener) {
         if (channelEventListener != null) {
             channelEventListeners.remove(channelEventListener);
-            LOGGER.info("unregister channel event listener: {}", 
channelEventListener.getClass().getName());
+            LOGGER.info(
+                "unregister channel event listener: {}",
+                channelEventListener.getClass().getName());
         }
     }
 
@@ -399,7 +422,7 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
      * Fires an EXCEPTION event to all registered listeners and cleans up 
resources.
      *
      * @param channel the channel where the exception occurred
-     * @param cause the throwable that represents the exception
+     * @param cause   the throwable that represents the exception
      */
     public void onChannelException(Channel channel, Throwable cause) {
         fireChannelEvent(channel, ChannelEventType.EXCEPTION, cause);
@@ -426,14 +449,16 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
         if (channel == null) {
             return;
         }
-        ChannelException cause = new ChannelException(
-            String.format("Channel disconnected: %s", 
channel.remoteAddress()));
+        ChannelException cause =
+            new ChannelException(String.format("Channel disconnected: %s", 
channel.remoteAddress()));
 
         Set<Integer> messageIds = collectMessageIdsForChannel(channel.id());
         cleanupFuturesForMessageIds(messageIds, cause);
 
-        LOGGER.info("Cleaned up {} pending requests for disconnected channel: 
{}",
-            messageIds.size(), channel.remoteAddress());
+        LOGGER.info(
+            "Cleaned up {} pending requests for disconnected channel: {}",
+            messageIds.size(),
+            channel.remoteAddress());
     }
 
     /**
@@ -447,7 +472,8 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
         Set<Integer> messageIds = new HashSet<>();
 
         String serverAddress = null;
-        for (Map.Entry<String, Channel> entry : 
clientChannelManager.getChannels().entrySet()) {
+        for (Map.Entry<String, Channel> entry :
+            clientChannelManager.getChannels().entrySet()) {
             Channel channel = entry.getValue();
             if (channelId.equals(channel.id())) {
                 serverAddress = entry.getKey();
@@ -483,10 +509,15 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
      * This completes futures with an exception to notify waiting threads.
      *
      * @param messageIds the set of message IDs whose futures should be 
cleaned up
-     * @param cause the exception to set as the result for each future
+     * @param cause      the exception to set as the result for each future
      */
     private void cleanupFuturesForMessageIds(Set<Integer> messageIds, 
Exception cause) {
         for (Integer messageId : messageIds) {
+            Integer parentId = childToParentMap.remove(messageId);
+            if (parentId != null) {
+                mergeMsgMap.remove(parentId);
+            }
+
             MessageFuture future = futures.remove(messageId);
             if (future != null) {
                 future.setResultMessage(cause);
@@ -499,7 +530,7 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
      * This is an overloaded version that calls {@link 
#fireChannelEvent(Channel, ChannelEventType, Throwable)}
      * with a null cause.
      *
-     * @param channel the channel associated with the event
+     * @param channel   the channel associated with the event
      * @param eventType the type of event that occurred
      */
     protected void fireChannelEvent(Channel channel, ChannelEventType 
eventType) {
@@ -511,9 +542,9 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
      * This method dispatches the event to the appropriate method on each 
listener
      * based on the event type.
      *
-     * @param channel the channel associated with the event
+     * @param channel   the channel associated with the event
      * @param eventType the type of event that occurred
-     * @param cause the cause of the event (maybe null for certain event types)
+     * @param cause     the cause of the event (maybe null for certain event 
types)
      */
     protected void fireChannelEvent(Channel channel, ChannelEventType 
eventType, Throwable cause) {
         for (ChannelEventListener listener : channelEventListeners) {
@@ -627,7 +658,7 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
         @Override
         public void channelRead(final ChannelHandlerContext ctx, Object msg) 
throws Exception {
             if (msg instanceof RpcMessage) {
-                processMessage(ctx, (RpcMessage)msg);
+                processMessage(ctx, (RpcMessage) msg);
             } else {
                 LOGGER.error("rpcMessage type error");
             }
@@ -651,7 +682,8 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info("channel inactive: {}", ctx.channel());
             }
-            clientChannelManager.releaseChannel(ctx.channel(), 
NetUtil.toStringAddress(ctx.channel().remoteAddress()));
+            clientChannelManager.releaseChannel(
+                ctx.channel(), 
NetUtil.toStringAddress(ctx.channel().remoteAddress()));
             super.channelInactive(ctx);
         }
 
@@ -664,7 +696,8 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
                         LOGGER.info("channel {} read idle.", ctx.channel());
                     }
                     try {
-                        String serverAddress = 
NetUtil.toStringAddress(ctx.channel().remoteAddress());
+                        String serverAddress =
+                            
NetUtil.toStringAddress(ctx.channel().remoteAddress());
                         clientChannelManager.invalidateObject(serverAddress, 
ctx.channel());
                     } catch (Exception exx) {
                         LOGGER.error(exx.getMessage());
@@ -687,8 +720,10 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
 
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
-            LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(),
-                NetUtil.toStringAddress(ctx.channel().remoteAddress()) + 
"connect exception. " + cause.getMessage(), cause);
+            LOGGER.error(
+                FrameworkErrorCode.ExceptionCaught.getErrCode(),
+                NetUtil.toStringAddress(ctx.channel().remoteAddress()) + 
"connect exception. " + cause.getMessage(),
+                cause);
             clientChannelManager.releaseChannel(ctx.channel(), 
getAddressFromChannel(ctx.channel()));
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info("remove exception rm channel:{}", ctx.channel());
@@ -704,5 +739,4 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
             super.close(ctx, future);
         }
     }
-
 }
diff --git 
a/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java
 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java
index ccf848c377..a246f68ead 100644
--- 
a/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java
+++ 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java
@@ -82,14 +82,14 @@ class ChannelEventHandlerIntegrationTest {
 
         ServerBootstrap serverBootstrap = new ServerBootstrap();
         serverBootstrap
-                .group(bossGroup, workerGroup)
-                .channel(NioServerSocketChannel.class)
-                .childHandler(new ChannelInitializer<SocketChannel>() {
-                    @Override
-                    protected void initChannel(SocketChannel ch) {
-                        ch.pipeline().addLast(new IdleStateHandler(1, 0, 0, 
TimeUnit.SECONDS));
-                    }
-                });
+            .group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel ch) {
+                    ch.pipeline().addLast(new IdleStateHandler(1, 0, 0, 
TimeUnit.SECONDS));
+                }
+            });
 
         serverChannel = serverBootstrap.bind(SERVER_PORT).sync().channel();
     }
@@ -118,36 +118,36 @@ class ChannelEventHandlerIntegrationTest {
         idleEventLatch = new CountDownLatch(1);
 
         lenient()
-                .doAnswer(invocation -> {
-                    channelActiveLatch.countDown();
-                    return null;
-                })
-                .when(mockRemotingClient)
-                .onChannelActive(any(Channel.class));
+            .doAnswer(invocation -> {
+                channelActiveLatch.countDown();
+                return null;
+            })
+            .when(mockRemotingClient)
+            .onChannelActive(any(Channel.class));
 
         lenient()
-                .doAnswer(invocation -> {
-                    channelInactiveLatch.countDown();
-                    return null;
-                })
-                .when(mockRemotingClient)
-                .onChannelInactive(any(Channel.class));
+            .doAnswer(invocation -> {
+                channelInactiveLatch.countDown();
+                return null;
+            })
+            .when(mockRemotingClient)
+            .onChannelInactive(any(Channel.class));
 
         lenient()
-                .doAnswer(invocation -> {
-                    exceptionCaughtLatch.countDown();
-                    return null;
-                })
-                .when(mockRemotingClient)
-                .onChannelException(any(Channel.class), any(Throwable.class));
+            .doAnswer(invocation -> {
+                exceptionCaughtLatch.countDown();
+                return null;
+            })
+            .when(mockRemotingClient)
+            .onChannelException(any(Channel.class), any(Throwable.class));
 
         lenient()
-                .doAnswer(invocation -> {
-                    idleEventLatch.countDown();
-                    return null;
-                })
-                .when(mockRemotingClient)
-                .onChannelIdle(any(Channel.class));
+            .doAnswer(invocation -> {
+                idleEventLatch.countDown();
+                return null;
+            })
+            .when(mockRemotingClient)
+            .onChannelIdle(any(Channel.class));
     }
 
     @AfterEach
@@ -165,8 +165,8 @@ class ChannelEventHandlerIntegrationTest {
         connectClient();
 
         assertTrue(
-                channelActiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
-                "Channel activation event was not detected");
+            channelActiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
+            "Channel activation event was not detected");
 
         verify(mockRemotingClient).onChannelActive(channelCaptor.capture());
         Channel capturedChannel = channelCaptor.getValue();
@@ -187,8 +187,8 @@ class ChannelEventHandlerIntegrationTest {
         clientChannel.close().sync();
 
         assertTrue(
-                channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
-                "Channel deactivation event was not detected");
+            channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
+            "Channel deactivation event was not detected");
 
         verify(mockRemotingClient).onChannelInactive(any(Channel.class));
     }
@@ -200,14 +200,14 @@ class ChannelEventHandlerIntegrationTest {
         DefaultChannelGroup serverChannels = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
         serverChannels.addAll(collectServerChannels(workerGroup));
         Channel serverSideClientChannel = serverChannels.stream()
-                .filter(ch -> ch.isActive() && ch.remoteAddress() != null)
-                .findFirst()
-                .orElseThrow(() -> new AssertionError("Failed to find client 
channel on server side"));
+            .filter(ch -> ch.isActive() && ch.remoteAddress() != null)
+            .findFirst()
+            .orElseThrow(() -> new AssertionError("Failed to find client 
channel on server side"));
 
         serverSideClientChannel.close().sync();
         assertTrue(
-                channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
-                "Channel inactive event was not detected on client side when 
server closed the connection");
+            channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
+            "Channel inactive event was not detected on client side when 
server closed the connection");
         verify(mockRemotingClient).onChannelInactive(any(Channel.class));
     }
 
@@ -265,16 +265,16 @@ class ChannelEventHandlerIntegrationTest {
                 SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) 
executor;
 
                 executor.submit(() -> {
-                            Iterator<Channel> it = 
eventLoop.registeredChannelsIterator();
-                            while (it.hasNext()) {
-                                Channel ch = it.next();
-                                if (ch.isActive() && ch instanceof 
SocketChannel) {
-                                    channels.add(ch);
-                                }
+                        Iterator<Channel> it = 
eventLoop.registeredChannelsIterator();
+                        while (it.hasNext()) {
+                            Channel ch = it.next();
+                            if (ch.isActive() && ch instanceof SocketChannel) {
+                                channels.add(ch);
                             }
-                            return null;
-                        })
-                        .sync();
+                        }
+                        return null;
+                    })
+                    .sync();
             }
         }
         return channels;
diff --git 
a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
index 34b15847db..2eaf463f03 100644
--- 
a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
+++ 
b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java
@@ -53,6 +53,7 @@ class ResourceCleanupTest {
     private Map<Integer, MergeMessage> mergeMsgMap;
     private Map<String, BlockingQueue<RpcMessage>> basketMap;
     private Map<String, Channel> channels;
+    private Map<Integer, Integer> childToParentMap;
 
     @BeforeEach
     void setUp() throws Exception {
@@ -77,6 +78,10 @@ class ResourceCleanupTest {
         Field channelsField = 
clientChannelManager.getClass().getDeclaredField("channels");
         channelsField.setAccessible(true);
         channels = (Map<String, Channel>) 
channelsField.get(clientChannelManager);
+
+        Field childToParentMapField = 
AbstractNettyRemotingClient.class.getDeclaredField("childToParentMap");
+        childToParentMapField.setAccessible(true);
+        childToParentMap = (Map<Integer, Integer>) 
childToParentMapField.get(client);
     }
 
     @Test
@@ -97,8 +102,12 @@ class ResourceCleanupTest {
         int parentId = 100;
         MergedWarpMessage mergeMessage = new MergedWarpMessage();
         mergeMessage.msgIds = new ArrayList<>();
+
         mergeMessage.msgIds.add(1);
+        childToParentMap.put(1, parentId);
+
         mergeMessage.msgIds.add(2);
+        childToParentMap.put(2, parentId);
 
         mergeMsgMap.put(parentId, mergeMessage);
 
@@ -115,6 +124,9 @@ class ResourceCleanupTest {
         assertFalse(futures.containsKey(1), "Future ID 1 has not been 
removed");
         assertFalse(futures.containsKey(2), "Future ID 2 has not been 
removed");
 
+        assertNull(childToParentMap.get(1), "Child to parent map should not 
contain ID 1");
+        assertNull(childToParentMap.get(2), "Child to parent map should not 
contain ID 2");
+
         assertThrows(RuntimeException.class, () -> messageFuture1.get(0, 
java.util.concurrent.TimeUnit.MILLISECONDS));
         assertThrows(RuntimeException.class, () -> messageFuture2.get(0, 
java.util.concurrent.TimeUnit.MILLISECONDS));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to