funky-eyes commented on code in PR #7337: URL: https://github.com/apache/incubator-seata/pull/7337#discussion_r2097433133
########## core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java: ########## @@ -336,6 +345,201 @@ private String getThreadPrefix() { */ protected abstract long getRpcRequestTimeout(); + + /** + * Registers a channel event listener to receive channel events. + * If the listener is already registered, it will not be added again. + * + * @param channelEventListener the channel event listener to register + */ + @Override + public void registerChannelEventListener(ChannelEventListener channelEventListener) { + if (channelEventListener != null) { + channelEventListeners.addIfAbsent(channelEventListener); + LOGGER.info("register channel event listener: {}", channelEventListener.getClass().getName()); + } + } + + /** + * Unregisters a previously registered channel event listener. + * + * @param channelEventListener the channel event listener to unregister + */ + @Override + public void unregisterChannelEventListener(ChannelEventListener channelEventListener) { + if (channelEventListener != null) { + channelEventListeners.remove(channelEventListener); + LOGGER.info("unregister channel event listener: {}", channelEventListener.getClass().getName()); + } + } + + /** + * Handles channel active events from Netty. + * Fires a CONNECTED event to all registered listeners. + * + * @param channel the channel that became active + */ + public void onChannelActive(Channel channel) { + fireChannelEvent(channel, ChannelEventType.CONNECTED); + } + + /** + * Handles channel inactive events from Netty. + * Fires a DISCONNECTED event to all registered listeners and cleans up resources. + * + * @param channel the channel that became inactive + */ + public void onChannelInactive(Channel channel) { + fireChannelEvent(channel, ChannelEventType.DISCONNECTED); + cleanupResourcesForChannel(channel); + } + + /** + * Handles channel exception events from Netty. + * Fires an EXCEPTION event to all registered listeners and cleans up resources. + * + * @param channel the channel where the exception occurred + * @param cause the throwable that represents the exception + */ + public void onChannelException(Channel channel, Throwable cause) { + fireChannelEvent(channel, ChannelEventType.EXCEPTION, cause); + cleanupResourcesForChannel(channel); + } + + /** + * Handles channel idle events from Netty. + * Fires an IDLE event to all registered listeners. + * + * @param channel the channel that became idle + */ + public void onChannelIdle(Channel channel) { + fireChannelEvent(channel, ChannelEventType.IDLE); + } + + /** + * Cleans up resources associated with a channel that has been disconnected. + * This includes collecting message IDs for the channel and cleaning up their futures. + * + * @param channel the channel for which resources should be cleaned up + */ + protected void cleanupResourcesForChannel(Channel channel) { + if (channel == null) { + return; + } + ChannelException cause = new ChannelException( + String.format("Channel disconnected: %s", channel.remoteAddress())); + + Set<Integer> messageIds = collectMessageIdsForChannel(channel.id()); + cleanupFuturesForMessageIds(messageIds, cause); + + LOGGER.info("Cleaned up {} pending requests for disconnected channel: {}", + messageIds.size(), channel.remoteAddress()); + } + + /** + * Collects message IDs associated with a specific channel. + * This is used during channel cleanup to identify pending requests. + * + * @param channelId the ID of the channel + * @return a set of message IDs associated with the channel + */ + private Set<Integer> collectMessageIdsForChannel(ChannelId channelId) { + Set<Integer> messageIds = new HashSet<>(); + + String serverAddress = null; + for (Map.Entry<String, Channel> entry : clientChannelManager.getChannels().entrySet()) { + Channel channel = entry.getValue(); + if (channelId.equals(channel.id())) { + serverAddress = entry.getKey(); + break; + } + } + + if (serverAddress == null) { + return messageIds; + } + + Iterator<Map.Entry<Integer, MergeMessage>> it = mergeMsgMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Integer, MergeMessage> entry = it.next(); + MergeMessage mergeMessage = entry.getValue(); + + if (mergeMessage instanceof MergedWarpMessage) { + MergedWarpMessage warpMessage = (MergedWarpMessage) mergeMessage; + + BlockingQueue<RpcMessage> basket = basketMap.get(serverAddress); + if (basket != null && !basket.isEmpty()) { + messageIds.addAll(warpMessage.msgIds); + it.remove(); + } + } + } + + return messageIds; + } + + /** + * Cleans up futures for a set of message IDs. + * This completes futures with an exception to notify waiting threads. + * + * @param messageIds the set of message IDs whose futures should be cleaned up + * @param cause the exception to set as the result for each future + */ + private void cleanupFuturesForMessageIds(Set<Integer> messageIds, Exception cause) { + for (Integer messageId : messageIds) { Review Comment: ``` @Override public void sendAsyncRequest(Channel channel, Object msg) { if (channel == null) { LOGGER.warn("sendAsyncRequest nothing, caused by null channel."); throw new FrameworkException(new Throwable("throw"), "frameworkException", FrameworkErrorCode.ChannelIsNotWritable); } RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); Object body = rpcMessage.getBody(); if (body instanceof MergeMessage) { Integer parentId = rpcMessage.getId(); mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody()); if (body instanceof MergedWarpMessage) { for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) { childToParentMap.put(msgId, parentId); } } } super.sendAsync(channel, rpcMessage); } ``` It seems that childToParentMap is not being cleared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org