YvCeung commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2659727977


##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -90,35 +103,68 @@ public void init() {
     public void onChangeEvent(ClusterChangeEvent event) {
         if (event.getTerm() > 0) {
             GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
-            // Notifications are made of changes in cluster information
+            // Notify all watchers of cluster information changes
             Optional.ofNullable(WATCHERS.remove(event.getGroup()))
                     .ifPresent(watchers -> 
watchers.parallelStream().forEach(this::notifyWatcher));
         }
     }
 
     private void notifyWatcher(Watcher<HttpContext> watcher) {
-        watcher.setDone(true);
-        sendWatcherResponse(watcher, HttpResponseStatus.OK);
-    }
+        HttpContext context = watcher.getAsyncContext();
+        boolean isHttp2 = context instanceof HttpContext && context.isHttp2();
+
+        if (!isHttp2) {
+            watcher.setDone(true);
+        }
+
+        boolean isFirstResponse = !HTTP2_HEADERS_SENT.getOrDefault(watcher, 
false);
+        sendWatcherResponse(watcher, HttpResponseStatus.OK, false, 
isFirstResponse);
+        if (isFirstResponse && isHttp2) {
+            HTTP2_HEADERS_SENT.put(watcher, true);
+        }
 
-    private void sendWatcherResponse(Watcher<HttpContext> watcher, 
HttpResponseStatus nettyStatus) {
+        // Update watcher's term to the latest term to prevent infinite loop
+        // This ensures that when registryWatcher is called, it won't trigger 
notifyWatcher again
         String group = watcher.getGroup();
+        Long latestTerm = GROUP_UPDATE_TERM.get(group);
+        if (latestTerm != null && latestTerm > watcher.getTerm()) {
+            watcher.setTerm(latestTerm);
+        }
+
+        // For HTTP/2, re-register the watcher to continue listening for 
future updates
+        if (isHttp2 && !watcher.isDone()) {
+            registryWatcher(watcher);
+        }
+    }
+    /**
+     * Send watcher response to the client.
+     *
+     * @param watcher     the watcher instance
+     * @param nettyStatus the HTTP status code
+     * @param closeStream whether to close the HTTP/2 stream (endStream=true)
+     * @param sendHeaders whether to send HTTP/2 headers frame (only needed 
for first response)
+     */
+    private void sendWatcherResponse(
+            Watcher<HttpContext> watcher, HttpResponseStatus nettyStatus, 
boolean closeStream, boolean sendHeaders) {
+
         HttpContext context = watcher.getAsyncContext();
         if (!(context instanceof HttpContext)) {
             logger.warn(
                     "Unsupported context type for watcher on group {}: {}",
-                    group,
+                    watcher.getGroup(),
                     context != null ? context.getClass().getName() : "null");
             return;
         }

Review Comment:
   I think type checking here is necessary, so the historical logic status quo 
has been retained, but the format of the printed logs has been modified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to