This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new 5cbc488623 optimize: update resource cleanup logic for channel disconnection (#7360) 5cbc488623 is described below commit 5cbc48862364b8959dae4c14531a64afff2626d7 Author: Yongjun Hong <yongj...@apache.org> AuthorDate: Sat May 24 22:14:54 2025 +0900 optimize: update resource cleanup logic for channel disconnection (#7360) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 2 +- .../rpc/netty/AbstractNettyRemotingClient.java | 146 +++++++++++++-------- .../netty/ChannelEventHandlerIntegrationTest.java | 100 +++++++------- .../seata/core/rpc/netty/ResourceCleanupTest.java | 12 ++ 5 files changed, 154 insertions(+), 107 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index cf0360be89..a8fef0c576 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -40,6 +40,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7344](https://github.com/apache/incubator-seata/pull/7344)] raft mode performs transaction size check in advance - [[#7337](https://github.com/apache/incubator-seata/pull/7337)] Add ChannelEventListener support to prevent memory leaks - [[#7350](https://github.com/apache/incubator-seata/pull/7350)] optimize codecov.yml +- [[#7360](https://github.com/apache/incubator-seata/pull/7360)] Update resource cleanup logic for channel disconnection ### security: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 2cafefb523..dc139bd5be 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -38,7 +38,7 @@ - [[#7344](https://github.com/apache/incubator-seata/pull/7344)] raft模式提前检查事务大小 - [[#7337](https://github.com/apache/incubator-seata/pull/7337)] 添加 ChannelEventListener 支持以防止内存泄漏 - [[#7350](https://github.com/apache/incubator-seata/pull/7350)] 优化单测覆盖配置 - +- [[#7360](https://github.com/apache/incubator-seata/pull/7360)] 更新通道断开连接时的资源清理逻辑 ### security: diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 2867cca2b6..383b6da509 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -16,8 +16,18 @@ */ package org.apache.seata.core.rpc.netty; +import static org.apache.seata.common.exception.FrameworkErrorCode.NoAvailableService; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelException; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; +import io.netty.channel.ChannelPromise; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.concurrent.EventExecutorGroup; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.HashSet; @@ -35,14 +45,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.concurrent.EventExecutorGroup; import org.apache.seata.common.exception.FrameworkErrorCode; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.thread.NamedThreadFactory; @@ -69,11 +71,8 @@ import org.apache.seata.discovery.registry.RegistryFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.seata.common.exception.FrameworkErrorCode.NoAvailableService; - /** * The netty remoting client. - * */ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient { @@ -105,7 +104,9 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting * Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable} * {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()} */ - protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap<String /*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = + new ConcurrentHashMap<>(); + private final NettyClientBootstrap clientBootstrap; private final NettyClientChannelManager clientChannelManager; private final NettyPoolKey.TransactionRole transactionRole; @@ -115,17 +116,23 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting @Override public void init() { - timerExecutor.scheduleAtFixedRate(() -> { - try { - clientChannelManager.reconnect(getTransactionServiceGroup()); - } catch (Exception ex) { - LOGGER.warn("reconnect server failed. {}", ex.getMessage()); - } - }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); + timerExecutor.scheduleAtFixedRate( + () -> { + try { + clientChannelManager.reconnect(getTransactionServiceGroup()); + } catch (Exception ex) { + LOGGER.warn("reconnect server failed. {}", ex.getMessage()); + } + }, + SCHEDULE_DELAY_MILLS, + SCHEDULE_INTERVAL_MILLS, + TimeUnit.MILLISECONDS); if (this.isEnableClientBatchSendRequest()) { - mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, + mergeSendExecutorService = new ThreadPoolExecutor( + MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, - KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, + KEEP_ALIVE_TIME, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); mergeSendExecutorService.submit(new MergedSendRunnable()); @@ -134,8 +141,11 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting clientBootstrap.start(); } - public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, - ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) { + public AbstractNettyRemotingClient( + NettyClientConfig nettyClientConfig, + EventExecutorGroup eventExecutorGroup, + ThreadPoolExecutor messageExecutor, + NettyPoolKey.TransactionRole transactionRole) { super(messageExecutor); this.transactionRole = transactionRole; clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole); @@ -161,11 +171,13 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting futures.put(rpcMessage.getId(), messageFuture); // put message into basketMap - BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress, - key -> new LinkedBlockingQueue<>()); + BlockingQueue<RpcMessage> basket = + CollectionUtils.computeIfAbsent(basketMap, serverAddress, key -> new LinkedBlockingQueue<>()); if (!basket.offer(rpcMessage)) { - LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}", - serverAddress, rpcMessage); + LOGGER.error( + "put message into basketMap offer failed, serverAddress:{},rpcMessage:{}", + serverAddress, + rpcMessage); return null; } if (LOGGER.isDebugEnabled()) { @@ -181,9 +193,13 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting Object response = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); return response; } catch (Exception exx) { - LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), serverAddress, rpcMessage.getBody()); + LOGGER.error( + "wait response error:{},ip:{},request:{}", + exx.getMessage(), + serverAddress, + rpcMessage.getBody()); if (exx instanceof TimeoutException) { - throw (TimeoutException)exx; + throw (TimeoutException) exx; } else { throw new RuntimeException(exx); } @@ -192,7 +208,6 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting Channel channel = clientChannelManager.acquireChannel(serverAddress); return super.sendSync(channel, rpcMessage, timeoutMillis); } - } @Override @@ -209,17 +224,20 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting public void sendAsyncRequest(Channel channel, Object msg) { if (channel == null) { LOGGER.warn("sendAsyncRequest nothing, caused by null channel."); - throw new FrameworkException(new Throwable("throw"), "frameworkException", FrameworkErrorCode.ChannelIsNotWritable); + throw new FrameworkException( + new Throwable("throw"), "frameworkException", FrameworkErrorCode.ChannelIsNotWritable); } - RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage - ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST - : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); + RpcMessage rpcMessage = buildRequestMessage( + msg, + msg instanceof HeartbeatMessage + ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST + : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); Object body = rpcMessage.getBody(); if (body instanceof MergeMessage) { Integer parentId = rpcMessage.getId(); - mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody()); + mergeMsgMap.put(parentId, (MergeMessage) rpcMessage.getBody()); if (body instanceof MergedWarpMessage) { - for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) { + for (Integer msgId : ((MergedWarpMessage) rpcMessage.getBody()).msgIds) { childToParentMap.put(msgId, parentId); } } @@ -310,7 +328,9 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting } catch (Exception ignore) { } } - return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid; + return StringUtils.isBlank(xid) + ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) + : xid; } private String getThreadPrefix() { @@ -345,7 +365,6 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting */ protected abstract long getRpcRequestTimeout(); - /** * Registers a channel event listener to receive channel events. * If the listener is already registered, it will not be added again. @@ -356,7 +375,9 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting public void registerChannelEventListener(ChannelEventListener channelEventListener) { if (channelEventListener != null) { channelEventListeners.addIfAbsent(channelEventListener); - LOGGER.info("register channel event listener: {}", channelEventListener.getClass().getName()); + LOGGER.info( + "register channel event listener: {}", + channelEventListener.getClass().getName()); } } @@ -369,7 +390,9 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting public void unregisterChannelEventListener(ChannelEventListener channelEventListener) { if (channelEventListener != null) { channelEventListeners.remove(channelEventListener); - LOGGER.info("unregister channel event listener: {}", channelEventListener.getClass().getName()); + LOGGER.info( + "unregister channel event listener: {}", + channelEventListener.getClass().getName()); } } @@ -399,7 +422,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting * Fires an EXCEPTION event to all registered listeners and cleans up resources. * * @param channel the channel where the exception occurred - * @param cause the throwable that represents the exception + * @param cause the throwable that represents the exception */ public void onChannelException(Channel channel, Throwable cause) { fireChannelEvent(channel, ChannelEventType.EXCEPTION, cause); @@ -426,14 +449,16 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting if (channel == null) { return; } - ChannelException cause = new ChannelException( - String.format("Channel disconnected: %s", channel.remoteAddress())); + ChannelException cause = + new ChannelException(String.format("Channel disconnected: %s", channel.remoteAddress())); Set<Integer> messageIds = collectMessageIdsForChannel(channel.id()); cleanupFuturesForMessageIds(messageIds, cause); - LOGGER.info("Cleaned up {} pending requests for disconnected channel: {}", - messageIds.size(), channel.remoteAddress()); + LOGGER.info( + "Cleaned up {} pending requests for disconnected channel: {}", + messageIds.size(), + channel.remoteAddress()); } /** @@ -447,7 +472,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting Set<Integer> messageIds = new HashSet<>(); String serverAddress = null; - for (Map.Entry<String, Channel> entry : clientChannelManager.getChannels().entrySet()) { + for (Map.Entry<String, Channel> entry : + clientChannelManager.getChannels().entrySet()) { Channel channel = entry.getValue(); if (channelId.equals(channel.id())) { serverAddress = entry.getKey(); @@ -483,10 +509,15 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting * This completes futures with an exception to notify waiting threads. * * @param messageIds the set of message IDs whose futures should be cleaned up - * @param cause the exception to set as the result for each future + * @param cause the exception to set as the result for each future */ private void cleanupFuturesForMessageIds(Set<Integer> messageIds, Exception cause) { for (Integer messageId : messageIds) { + Integer parentId = childToParentMap.remove(messageId); + if (parentId != null) { + mergeMsgMap.remove(parentId); + } + MessageFuture future = futures.remove(messageId); if (future != null) { future.setResultMessage(cause); @@ -499,7 +530,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting * This is an overloaded version that calls {@link #fireChannelEvent(Channel, ChannelEventType, Throwable)} * with a null cause. * - * @param channel the channel associated with the event + * @param channel the channel associated with the event * @param eventType the type of event that occurred */ protected void fireChannelEvent(Channel channel, ChannelEventType eventType) { @@ -511,9 +542,9 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting * This method dispatches the event to the appropriate method on each listener * based on the event type. * - * @param channel the channel associated with the event + * @param channel the channel associated with the event * @param eventType the type of event that occurred - * @param cause the cause of the event (maybe null for certain event types) + * @param cause the cause of the event (maybe null for certain event types) */ protected void fireChannelEvent(Channel channel, ChannelEventType eventType, Throwable cause) { for (ChannelEventListener listener : channelEventListeners) { @@ -627,7 +658,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RpcMessage) { - processMessage(ctx, (RpcMessage)msg); + processMessage(ctx, (RpcMessage) msg); } else { LOGGER.error("rpcMessage type error"); } @@ -651,7 +682,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting if (LOGGER.isInfoEnabled()) { LOGGER.info("channel inactive: {}", ctx.channel()); } - clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress())); + clientChannelManager.releaseChannel( + ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress())); super.channelInactive(ctx); } @@ -664,7 +696,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting LOGGER.info("channel {} read idle.", ctx.channel()); } try { - String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress()); + String serverAddress = + NetUtil.toStringAddress(ctx.channel().remoteAddress()); clientChannelManager.invalidateObject(serverAddress, ctx.channel()); } catch (Exception exx) { LOGGER.error(exx.getMessage()); @@ -687,8 +720,10 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), - NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause); + LOGGER.error( + FrameworkErrorCode.ExceptionCaught.getErrCode(), + NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), + cause); clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel())); if (LOGGER.isInfoEnabled()) { LOGGER.info("remove exception rm channel:{}", ctx.channel()); @@ -704,5 +739,4 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting super.close(ctx, future); } } - } diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java index ccf848c377..a246f68ead 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/ChannelEventHandlerIntegrationTest.java @@ -82,14 +82,14 @@ class ChannelEventHandlerIntegrationTest { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap - .group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) { - ch.pipeline().addLast(new IdleStateHandler(1, 0, 0, TimeUnit.SECONDS)); - } - }); + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new IdleStateHandler(1, 0, 0, TimeUnit.SECONDS)); + } + }); serverChannel = serverBootstrap.bind(SERVER_PORT).sync().channel(); } @@ -118,36 +118,36 @@ class ChannelEventHandlerIntegrationTest { idleEventLatch = new CountDownLatch(1); lenient() - .doAnswer(invocation -> { - channelActiveLatch.countDown(); - return null; - }) - .when(mockRemotingClient) - .onChannelActive(any(Channel.class)); + .doAnswer(invocation -> { + channelActiveLatch.countDown(); + return null; + }) + .when(mockRemotingClient) + .onChannelActive(any(Channel.class)); lenient() - .doAnswer(invocation -> { - channelInactiveLatch.countDown(); - return null; - }) - .when(mockRemotingClient) - .onChannelInactive(any(Channel.class)); + .doAnswer(invocation -> { + channelInactiveLatch.countDown(); + return null; + }) + .when(mockRemotingClient) + .onChannelInactive(any(Channel.class)); lenient() - .doAnswer(invocation -> { - exceptionCaughtLatch.countDown(); - return null; - }) - .when(mockRemotingClient) - .onChannelException(any(Channel.class), any(Throwable.class)); + .doAnswer(invocation -> { + exceptionCaughtLatch.countDown(); + return null; + }) + .when(mockRemotingClient) + .onChannelException(any(Channel.class), any(Throwable.class)); lenient() - .doAnswer(invocation -> { - idleEventLatch.countDown(); - return null; - }) - .when(mockRemotingClient) - .onChannelIdle(any(Channel.class)); + .doAnswer(invocation -> { + idleEventLatch.countDown(); + return null; + }) + .when(mockRemotingClient) + .onChannelIdle(any(Channel.class)); } @AfterEach @@ -165,8 +165,8 @@ class ChannelEventHandlerIntegrationTest { connectClient(); assertTrue( - channelActiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), - "Channel activation event was not detected"); + channelActiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), + "Channel activation event was not detected"); verify(mockRemotingClient).onChannelActive(channelCaptor.capture()); Channel capturedChannel = channelCaptor.getValue(); @@ -187,8 +187,8 @@ class ChannelEventHandlerIntegrationTest { clientChannel.close().sync(); assertTrue( - channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), - "Channel deactivation event was not detected"); + channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), + "Channel deactivation event was not detected"); verify(mockRemotingClient).onChannelInactive(any(Channel.class)); } @@ -200,14 +200,14 @@ class ChannelEventHandlerIntegrationTest { DefaultChannelGroup serverChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); serverChannels.addAll(collectServerChannels(workerGroup)); Channel serverSideClientChannel = serverChannels.stream() - .filter(ch -> ch.isActive() && ch.remoteAddress() != null) - .findFirst() - .orElseThrow(() -> new AssertionError("Failed to find client channel on server side")); + .filter(ch -> ch.isActive() && ch.remoteAddress() != null) + .findFirst() + .orElseThrow(() -> new AssertionError("Failed to find client channel on server side")); serverSideClientChannel.close().sync(); assertTrue( - channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), - "Channel inactive event was not detected on client side when server closed the connection"); + channelInactiveLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), + "Channel inactive event was not detected on client side when server closed the connection"); verify(mockRemotingClient).onChannelInactive(any(Channel.class)); } @@ -265,16 +265,16 @@ class ChannelEventHandlerIntegrationTest { SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) executor; executor.submit(() -> { - Iterator<Channel> it = eventLoop.registeredChannelsIterator(); - while (it.hasNext()) { - Channel ch = it.next(); - if (ch.isActive() && ch instanceof SocketChannel) { - channels.add(ch); - } + Iterator<Channel> it = eventLoop.registeredChannelsIterator(); + while (it.hasNext()) { + Channel ch = it.next(); + if (ch.isActive() && ch instanceof SocketChannel) { + channels.add(ch); } - return null; - }) - .sync(); + } + return null; + }) + .sync(); } } return channels; diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java index 34b15847db..2eaf463f03 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/ResourceCleanupTest.java @@ -53,6 +53,7 @@ class ResourceCleanupTest { private Map<Integer, MergeMessage> mergeMsgMap; private Map<String, BlockingQueue<RpcMessage>> basketMap; private Map<String, Channel> channels; + private Map<Integer, Integer> childToParentMap; @BeforeEach void setUp() throws Exception { @@ -77,6 +78,10 @@ class ResourceCleanupTest { Field channelsField = clientChannelManager.getClass().getDeclaredField("channels"); channelsField.setAccessible(true); channels = (Map<String, Channel>) channelsField.get(clientChannelManager); + + Field childToParentMapField = AbstractNettyRemotingClient.class.getDeclaredField("childToParentMap"); + childToParentMapField.setAccessible(true); + childToParentMap = (Map<Integer, Integer>) childToParentMapField.get(client); } @Test @@ -97,8 +102,12 @@ class ResourceCleanupTest { int parentId = 100; MergedWarpMessage mergeMessage = new MergedWarpMessage(); mergeMessage.msgIds = new ArrayList<>(); + mergeMessage.msgIds.add(1); + childToParentMap.put(1, parentId); + mergeMessage.msgIds.add(2); + childToParentMap.put(2, parentId); mergeMsgMap.put(parentId, mergeMessage); @@ -115,6 +124,9 @@ class ResourceCleanupTest { assertFalse(futures.containsKey(1), "Future ID 1 has not been removed"); assertFalse(futures.containsKey(2), "Future ID 2 has not been removed"); + assertNull(childToParentMap.get(1), "Child to parent map should not contain ID 1"); + assertNull(childToParentMap.get(2), "Child to parent map should not contain ID 2"); + assertThrows(RuntimeException.class, () -> messageFuture1.get(0, java.util.concurrent.TimeUnit.MILLISECONDS)); assertThrows(RuntimeException.class, () -> messageFuture2.get(0, java.util.concurrent.TimeUnit.MILLISECONDS)); } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org