YvCeung commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2665164922


##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,24 +59,35 @@ public class ClusterWatcherManager implements 
ClusterChangeListener {
 
     private static final Map<String, Long> GROUP_UPDATE_TERM = new 
ConcurrentHashMap<>();
 
+    private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT 
= new ConcurrentHashMap<>();
+
     private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
             new ScheduledThreadPoolExecutor(1, new 
NamedThreadFactory("long-polling", 1));
 
     @PostConstruct
     public void init() {
-        // Responds to monitors that time out
+        // Periodically check and respond to watchers that have timed out
         scheduledThreadPoolExecutor.scheduleAtFixedRate(
                 () -> {
                     for (String group : WATCHERS.keySet()) {
                         Optional.ofNullable(WATCHERS.remove(group))
                                 .ifPresent(watchers -> 
watchers.parallelStream().forEach(watcher -> {
-                                    if (System.currentTimeMillis() >= 
watcher.getTimeout()) {
-                                        watcher.setDone(true);
-                                        sendWatcherResponse(watcher, 
HttpResponseStatus.NOT_MODIFIED);
-                                    }
-                                    if (!watcher.isDone()) {
-                                        // Re-register
-                                        registryWatcher(watcher);
+                                    HttpContext context = 
watcher.getAsyncContext();
+                                    boolean isHttp2 = context instanceof 
HttpContext && context.isHttp2();
+                                    if (isHttp2) {
+                                        if 
(!context.getContext().channel().isActive()) {
+                                            watcher.setDone(true);
+                                            HTTP2_HEADERS_SENT.remove(watcher);
+                                        } else {
+                                            registryWatcher(watcher);

Review Comment:
   Currently, optimizations have been made for potential loss issues. Different 
queues have been created for watching HTTP1 and HTTP2 respectively to store 
data. HTTP2 does not remove and re-register regularly. The scheduled tasks only 
check whether the channel is valid to prevent memory leaks.
   ```java
   // Separate watchers for HTTP/1.1 (one-time requests) and HTTP/2 (long-lived 
connections)
       private static final Map<String, Queue<Watcher<HttpContext>>> 
HTTP1_WATCHERS = new ConcurrentHashMap<>();
       private static final Map<String, Queue<Watcher<HttpContext>>> 
HTTP2_WATCHERS = new ConcurrentHashMap<>();
   
   // Check HTTP/2 watchers for connection validity (don't remove, just check)
                       for (Map.Entry<String, Queue<Watcher<HttpContext>>> 
entry : HTTP2_WATCHERS.entrySet()) {
                           String group = entry.getKey();
                           Queue<Watcher<HttpContext>> watchers = 
entry.getValue();
                           if (watchers == null || watchers.isEmpty()) {
                               continue;
                           }
   
                           // Create snapshot to avoid concurrent modification
                           List<Watcher<HttpContext>> watchersToCheck = new 
ArrayList<>(watchers);
   
                           watchersToCheck.forEach(watcher -> {
                               HttpContext context = watcher.getAsyncContext();
                               if (!context.getContext().channel().isActive()) {
                                   // Remove invalid watcher
                                   watchers.remove(watcher);
                                   watcher.setDone(true);
                                   HTTP2_HEADERS_SENT.remove(watcher);
                                   logger.debug("Removed inactive HTTP/2 
watcher for group: {}", group);
                               }
                           });
                       }
                   },
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to