funky-eyes commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2666816518


##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,24 +59,35 @@ public class ClusterWatcherManager implements 
ClusterChangeListener {
 
     private static final Map<String, Long> GROUP_UPDATE_TERM = new 
ConcurrentHashMap<>();
 
+    private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT 
= new ConcurrentHashMap<>();
+
     private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
             new ScheduledThreadPoolExecutor(1, new 
NamedThreadFactory("long-polling", 1));
 
     @PostConstruct
     public void init() {
-        // Responds to monitors that time out
+        // Periodically check and respond to watchers that have timed out
         scheduledThreadPoolExecutor.scheduleAtFixedRate(
                 () -> {
                     for (String group : WATCHERS.keySet()) {
                         Optional.ofNullable(WATCHERS.remove(group))
                                 .ifPresent(watchers -> 
watchers.parallelStream().forEach(watcher -> {
-                                    if (System.currentTimeMillis() >= 
watcher.getTimeout()) {
-                                        watcher.setDone(true);
-                                        sendWatcherResponse(watcher, 
HttpResponseStatus.NOT_MODIFIED);
-                                    }
-                                    if (!watcher.isDone()) {
-                                        // Re-register
-                                        registryWatcher(watcher);
+                                    HttpContext context = 
watcher.getAsyncContext();
+                                    boolean isHttp2 = context instanceof 
HttpContext && context.isHttp2();
+                                    if (isHttp2) {
+                                        if 
(!context.getContext().channel().isActive()) {
+                                            watcher.setDone(true);
+                                            HTTP2_HEADERS_SENT.remove(watcher);
+                                        } else {
+                                            registryWatcher(watcher);

Review Comment:
   > Currently, optimizations have been made for potential loss issues. 
Different queues have been created for watching HTTP1 and HTTP2 respectively to 
store data. HTTP2 does not remove and re-register regularly. The scheduled 
tasks only check whether the channel is valid to prevent memory leaks.
   > 
   > ```java
   > // Separate watchers for HTTP/1.1 (one-time requests) and HTTP/2 
(long-lived connections)
   >     private static final Map<String, Queue<Watcher<HttpContext>>> 
HTTP1_WATCHERS = new ConcurrentHashMap<>();
   >     private static final Map<String, Queue<Watcher<HttpContext>>> 
HTTP2_WATCHERS = new ConcurrentHashMap<>();
   > 
   > // Check HTTP/2 watchers for connection validity (don't remove, just check)
   >                     for (Map.Entry<String, Queue<Watcher<HttpContext>>> 
entry : HTTP2_WATCHERS.entrySet()) {
   >                         String group = entry.getKey();
   >                         Queue<Watcher<HttpContext>> watchers = 
entry.getValue();
   >                         if (watchers == null || watchers.isEmpty()) {
   >                             continue;
   >                         }
   > 
   >                         // Create snapshot to avoid concurrent modification
   >                         List<Watcher<HttpContext>> watchersToCheck = new 
ArrayList<>(watchers);
   > 
   >                         watchersToCheck.forEach(watcher -> {
   >                             HttpContext context = 
watcher.getAsyncContext();
   >                             if 
(!context.getContext().channel().isActive()) {
   >                                 // Remove invalid watcher
   >                                 watchers.remove(watcher);
   >                                 watcher.setDone(true);
   >                                 HTTP2_HEADERS_SENT.remove(watcher);
   >                                 logger.debug("Removed inactive HTTP/2 
watcher for group: {}", group);
   >                             }
   >                         });
   >                     }
   >                 },
   > ```
   
   We also need to consider another issue: since the response is written back 
using HTTP/2 + SSE, is there any thread safety concern with concurrent writes 
to the SSE response? Additionally, could there be an out-of-order delivery 
problem—for example, if two events are generated, with event 1 arriving at the 
client after event 2, potentially causing the newer event to be overwritten by 
the older one?
   To address this, should we ensure that events are delivered in order and 
include term validation? Specifically, if the term of a previous event in the 
response is already greater than that of a subsequent event, the latter should 
be discarded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to