funky-eyes commented on code in PR #7337:
URL: https://github.com/apache/incubator-seata/pull/7337#discussion_r2097433133


##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -336,6 +345,201 @@ private String getThreadPrefix() {
      */
     protected abstract long getRpcRequestTimeout();
 
+
+    /**
+     * Registers a channel event listener to receive channel events.
+     * If the listener is already registered, it will not be added again.
+     *
+     * @param channelEventListener the channel event listener to register
+     */
+    @Override
+    public void registerChannelEventListener(ChannelEventListener 
channelEventListener) {
+        if (channelEventListener != null) {
+            channelEventListeners.addIfAbsent(channelEventListener);
+            LOGGER.info("register channel event listener: {}", 
channelEventListener.getClass().getName());
+        }
+    }
+
+    /**
+     * Unregisters a previously registered channel event listener.
+     *
+     * @param channelEventListener the channel event listener to unregister
+     */
+    @Override
+    public void unregisterChannelEventListener(ChannelEventListener 
channelEventListener) {
+        if (channelEventListener != null) {
+            channelEventListeners.remove(channelEventListener);
+            LOGGER.info("unregister channel event listener: {}", 
channelEventListener.getClass().getName());
+        }
+    }
+
+    /**
+     * Handles channel active events from Netty.
+     * Fires a CONNECTED event to all registered listeners.
+     *
+     * @param channel the channel that became active
+     */
+    public void onChannelActive(Channel channel) {
+        fireChannelEvent(channel, ChannelEventType.CONNECTED);
+    }
+
+    /**
+     * Handles channel inactive events from Netty.
+     * Fires a DISCONNECTED event to all registered listeners and cleans up 
resources.
+     *
+     * @param channel the channel that became inactive
+     */
+    public void onChannelInactive(Channel channel) {
+        fireChannelEvent(channel, ChannelEventType.DISCONNECTED);
+        cleanupResourcesForChannel(channel);
+    }
+
+    /**
+     * Handles channel exception events from Netty.
+     * Fires an EXCEPTION event to all registered listeners and cleans up 
resources.
+     *
+     * @param channel the channel where the exception occurred
+     * @param cause the throwable that represents the exception
+     */
+    public void onChannelException(Channel channel, Throwable cause) {
+        fireChannelEvent(channel, ChannelEventType.EXCEPTION, cause);
+        cleanupResourcesForChannel(channel);
+    }
+
+    /**
+     * Handles channel idle events from Netty.
+     * Fires an IDLE event to all registered listeners.
+     *
+     * @param channel the channel that became idle
+     */
+    public void onChannelIdle(Channel channel) {
+        fireChannelEvent(channel, ChannelEventType.IDLE);
+    }
+
+    /**
+     * Cleans up resources associated with a channel that has been 
disconnected.
+     * This includes collecting message IDs for the channel and cleaning up 
their futures.
+     *
+     * @param channel the channel for which resources should be cleaned up
+     */
+    protected void cleanupResourcesForChannel(Channel channel) {
+        if (channel == null) {
+            return;
+        }
+        ChannelException cause = new ChannelException(
+            String.format("Channel disconnected: %s", 
channel.remoteAddress()));
+
+        Set<Integer> messageIds = collectMessageIdsForChannel(channel.id());
+        cleanupFuturesForMessageIds(messageIds, cause);
+
+        LOGGER.info("Cleaned up {} pending requests for disconnected channel: 
{}",
+            messageIds.size(), channel.remoteAddress());
+    }
+
+    /**
+     * Collects message IDs associated with a specific channel.
+     * This is used during channel cleanup to identify pending requests.
+     *
+     * @param channelId the ID of the channel
+     * @return a set of message IDs associated with the channel
+     */
+    private Set<Integer> collectMessageIdsForChannel(ChannelId channelId) {
+        Set<Integer> messageIds = new HashSet<>();
+
+        String serverAddress = null;
+        for (Map.Entry<String, Channel> entry : 
clientChannelManager.getChannels().entrySet()) {
+            Channel channel = entry.getValue();
+            if (channelId.equals(channel.id())) {
+                serverAddress = entry.getKey();
+                break;
+            }
+        }
+
+        if (serverAddress == null) {
+            return messageIds;
+        }
+
+        Iterator<Map.Entry<Integer, MergeMessage>> it = 
mergeMsgMap.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<Integer, MergeMessage> entry = it.next();
+            MergeMessage mergeMessage = entry.getValue();
+
+            if (mergeMessage instanceof MergedWarpMessage) {
+                MergedWarpMessage warpMessage = (MergedWarpMessage) 
mergeMessage;
+
+                BlockingQueue<RpcMessage> basket = 
basketMap.get(serverAddress);
+                if (basket != null && !basket.isEmpty()) {
+                    messageIds.addAll(warpMessage.msgIds);
+                    it.remove();
+                }
+            }
+        }
+
+        return messageIds;
+    }
+
+    /**
+     * Cleans up futures for a set of message IDs.
+     * This completes futures with an exception to notify waiting threads.
+     *
+     * @param messageIds the set of message IDs whose futures should be 
cleaned up
+     * @param cause the exception to set as the result for each future
+     */
+    private void cleanupFuturesForMessageIds(Set<Integer> messageIds, 
Exception cause) {
+        for (Integer messageId : messageIds) {

Review Comment:
   ```
       @Override
       public void sendAsyncRequest(Channel channel, Object msg) {
           if (channel == null) {
               LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
               throw new FrameworkException(new Throwable("throw"), 
"frameworkException", FrameworkErrorCode.ChannelIsNotWritable);
           }
           RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof 
HeartbeatMessage
               ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
               : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
           Object body = rpcMessage.getBody();
           if (body instanceof MergeMessage) {
               Integer parentId = rpcMessage.getId();
               mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody());
               if (body instanceof MergedWarpMessage) {
                   for (Integer msgId : 
((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
                       childToParentMap.put(msgId, parentId);
                   }
               }
           }
           super.sendAsync(channel, rpcMessage);
       }
   ```
   It seems that childToParentMap is not being cleared.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to