Copilot commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2735045941


##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -89,36 +141,104 @@ public void init() {
     @Async
     public void onChangeEvent(ClusterChangeEvent event) {
         if (event.getTerm() > 0) {
-            GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
-            // Notifications are made of changes in cluster information
-            Optional.ofNullable(WATCHERS.remove(event.getGroup()))
-                    .ifPresent(watchers -> 
watchers.parallelStream().forEach(this::notifyWatcher));
+            String group = event.getGroup();
+            Long eventTerm = event.getTerm();
+
+            Long currentTerm = GROUP_UPDATE_TERM.get(group);
+            if (currentTerm != null && eventTerm <= currentTerm) {
+                logger.info(
+                        "Discarding outdated event with term {} for group {}, 
current term is {}",
+                        eventTerm,
+                        group,
+                        currentTerm);
+                return;
+            }
+
+            GROUP_UPDATE_TERM.put(group, eventTerm);
+
+            // Handle HTTP/1.1 watchers: remove and notify (one-time request)
+            Optional.ofNullable(HTTP1_WATCHERS.remove(group))
+                    .ifPresent(watchers -> 
watchers.parallelStream().forEach(watcher -> {
+                        notifyWatcher(watcher, eventTerm);
+                        // HTTP/1.1 watcher is done after notification
+                        watcher.setDone(true);
+                    }));
+
+            // Handle HTTP/2 watchers: notify without removing (long-lived 
connection)
+            Queue<Watcher<HttpContext>> http2Watchers = 
HTTP2_WATCHERS.get(group);
+            if (http2Watchers != null && !http2Watchers.isEmpty()) {
+                List<Watcher<HttpContext>> watchersToNotify = new 
ArrayList<>(http2Watchers);
+                watchersToNotify.forEach(watcher -> {
+                    if 
(watcher.getAsyncContext().getContext().channel().isActive() && 
!watcher.isDone()) {
+                        if (eventTerm > watcher.getTerm()) {
+                            notifyWatcher(watcher, eventTerm);
+                        } else {
+                            logger.info(
+                                    "Skipping notification for watcher with 
term {} >= event term {} for group {}",
+                                    watcher.getTerm(),
+                                    eventTerm,
+                                    group);
+                        }
+                    } else {
+                        // Remove inactive watcher
+                        http2Watchers.remove(watcher);
+                        HTTP2_HEADERS_SENT.remove(watcher);
+                    }
+                });
+            }
         }
     }
 
-    private void notifyWatcher(Watcher<HttpContext> watcher) {
-        watcher.setDone(true);
-        sendWatcherResponse(watcher, HttpResponseStatus.OK);
+    private void notifyWatcher(Watcher<HttpContext> watcher, Long eventTerm) {
+        HttpContext context = watcher.getAsyncContext();
+        boolean isHttp2 = context.isHttp2();
+
+        if (!isHttp2) {
+            watcher.setDone(true);
+        }
+
+        boolean isFirstResponse = !HTTP2_HEADERS_SENT.getOrDefault(watcher, 
false);
+        sendWatcherResponse(watcher, HttpResponseStatus.OK, false, 
isFirstResponse);
+        if (isFirstResponse && isHttp2) {
+            HTTP2_HEADERS_SENT.put(watcher, true);
+        }
+
+        if (eventTerm != null && eventTerm > watcher.getTerm()) {
+            watcher.setTerm(eventTerm);
+        }
     }
+    /**
+     * Send watcher response to the client.
+     *
+     * @param watcher     the watcher instance
+     * @param nettyStatus the HTTP status code
+     * @param closeStream whether to close the HTTP/2 stream (endStream=true)
+     * @param sendHeaders whether to send HTTP/2 headers frame (only needed 
for first response)
+     */
+    private void sendWatcherResponse(
+            Watcher<HttpContext> watcher,
+            HttpResponseStatus nettyStatus,
+            boolean closeStream,
+            boolean sendHeaders) {
 
-    private void sendWatcherResponse(Watcher<HttpContext> watcher, 
HttpResponseStatus nettyStatus) {
-        String group = watcher.getGroup();
         HttpContext context = watcher.getAsyncContext();
         if (!(context instanceof HttpContext)) {
             logger.warn(
                     "Unsupported context type for watcher on group {}: {}",
-                    group,
+                    watcher.getGroup(),
                     context != null ? context.getClass().getName() : "null");
             return;
         }

Review Comment:
   The check on line 225 `if (!(context instanceof HttpContext))` is redundant 
since `context` is already declared as type `HttpContext` on line 224. This 
condition will always be false (unless there's some unexpected classloader 
issue). Consider removing this check or clarifying what the intended validation 
is.



##########
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java:
##########
@@ -131,70 +129,152 @@ public void run() {
                     throw new RuntimeException(e);
                 }
                 ((ApplicationEventPublisher) 
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
-                        .publishEvent(new ClusterChangeEvent(this, 
"default-test", 2, true));
+                        .publishEvent(new ClusterChangeEvent(this, 
"default-test-group-2", 2, true));
             }
         });
         thread.start();
         try (Response response =
                 HttpClientUtil.doPost("http://127.0.0.1:"; + port + 
"/metadata/v1/watch", param, header, 30000)) {
             if (response != null) {
-                Assertions.assertEquals(HttpStatus.SC_OK, response.code());
+                Assertions.assertEquals(200, response.code());
                 return;
             }
         }
         Assertions.fail();
     }
 
+    /**
+     * Verification points:
+     * 1. Verify HTTP/2 data push continuity: client can continuously receive 
events when server pushes them sequentially
+     * 2. Note: Cannot verify MetadataResponse due to inability to simulate 
real term changes in test environment
+     */
     @Test
-    @Order(5)
-    void watch_withHttp2() throws Exception {
-        CountDownLatch latch = new CountDownLatch(1);
+    @Order(4)
+    void watchStream_http2() throws Exception {
+        Map<String, String> header = new HashMap<>();
+        header.put(HTTP.CONTENT_TYPE, 
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+        Map<String, String> param = new HashMap<>();
+        param.put("default-test-group-3", "1");
+        
+        // Trigger a cluster change event after connection is established
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
 
-        Map<String, String> headers = new HashMap<>();
-        headers.put(HTTP.CONTENT_TYPE, 
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+                ((ApplicationEventPublisher) 
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+                        .publishEvent(new ClusterChangeEvent(this, 
"default-test-group-3", 2, true));
+            }
+        });
+        thread.start();
 
-        Map<String, String> params = new HashMap<>();
-        params.put("default-test", "1");
+        try (SeataHttpWatch<ClusterWatchEvent> watch = 
HttpClientUtil.watchPost(
+                "http://127.0.0.1:"; + port + "/metadata/v1/watch", param, 
header, ClusterWatchEvent.class)) {
+            
+            // Verify HTTP/2 data push continuity: receive first event 
(connection established)
+            SeataHttpWatch.Response<ClusterWatchEvent> firstResponse = 
watch.next();
+            Assertions.assertNotNull(firstResponse.object, "First event data 
should not be null");
+            Assertions.assertEquals(SeataHttpWatch.Response.Type.UPDATE, 
firstResponse.type, "First event should be UPDATE");
+            
+            // Verify HTTP/2 data push continuity: receive second event 
(cluster change event)
+            SeataHttpWatch.Response<ClusterWatchEvent> secondResponse = 
watch.next();
+            Assertions.assertNotNull(secondResponse.object, "Second event data 
should not be null");
+            Assertions.assertEquals(SeataHttpWatch.Response.Type.UPDATE, 
secondResponse.type, "Second event should be UPDATE");
+            Assertions.assertEquals("default-test-group-3", 
secondResponse.object.getGroup(), "Group should match");
+            
+            logger.info("Successfully received two consecutive events from 
server");
+        }
+    }
+
+    /**
+     * Verification points:
+     * 1. Verify HTTP/2 data push continuity: client can continuously receive 
multiple events when server pushes them sequentially
+     * 2. Note: Cannot verify MetadataResponse due to inability to simulate 
real term changes in test environment
+     */
+    @Test
+    @Order(5)
+    void watchMultipleClusterUpdates_http2() throws Exception {
+        Map<String, String> header = new HashMap<>();
+        header.put(HTTP.CONTENT_TYPE, 
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+        Map<String, String> param = new HashMap<>();
+        param.put("default-test-group-4", "1");
 
-        Thread thread = new Thread(() -> {
+        // Trigger multiple cluster change events sequentially
+        Thread triggerThread = new Thread(() -> {
             try {
-                Thread.sleep(2000);
+                Thread.sleep(3000); // Wait for connection to be established
+
+                ((ApplicationEventPublisher) 
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+                        .publishEvent(new ClusterChangeEvent(this, 
"default-test-group-4", 2, true));
+                Thread.sleep(1000);
+
+                ((ApplicationEventPublisher) 
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+                        .publishEvent(new ClusterChangeEvent(this, 
"default-test-group-4", 3, true));
+                Thread.sleep(500);
+
+                ((ApplicationEventPublisher) 
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+                        .publishEvent(new ClusterChangeEvent(this, 
"default-test-group-4", 4, true));
             } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                Thread.currentThread().interrupt();
             }
-            ((ApplicationEventPublisher) 
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
-                    .publishEvent(new ClusterChangeEvent(this, "default-test", 
2, true));
         });
-        thread.start();
+        triggerThread.start();
 
-        HttpCallback<Response> callback = new HttpCallback<Response>() {
-            @Override
-            public void onSuccess(Response response) {
-                Assertions.assertNotNull(response);
-                Assertions.assertEquals(Protocol.H2_PRIOR_KNOWLEDGE, 
response.protocol());
-                Assertions.assertEquals(HttpStatus.SC_OK, response.code());
-                latch.countDown();
-            }
+        boolean firstEventReceived = false;
+        int clusterUpdateCount = 0;
+        long startTime = System.currentTimeMillis();
+        long maxWaitTime = 10000;
+        int expectedUpdateCount = 3;
 
-            @Override
-            public void onFailure(Throwable t) {
-                Assertions.fail("Should not fail: " + t.getMessage());
-            }
+        // Verify HTTP/2 data push continuity: continuously receive multiple 
events
+        try (SeataHttpWatch<ClusterWatchEvent> watch = 
HttpClientUtil.watchPost(
+                "http://127.0.0.1:"; + port + "/metadata/v1/watch", param, 
header, ClusterWatchEvent.class)) {
 
-            @Override
-            public void onCancelled() {
-                Assertions.fail("Should not be cancelled");
+            while (System.currentTimeMillis() - startTime < maxWaitTime && 
clusterUpdateCount < expectedUpdateCount) {
+                try {
+                    if (watch.hasNext()) {
+                        SeataHttpWatch.Response<ClusterWatchEvent> response = 
watch.next();
+                        if (response.type == 
SeataHttpWatch.Response.Type.UPDATE) {
+                            Assertions.assertNotNull(response.object, "Event 
data should not be null");
+                            
+                            if (!firstEventReceived) {
+                                firstEventReceived = true;
+                                logger.info("First event (connection 
established) received");
+                            } else {
+                                clusterUpdateCount++;
+                                Assertions.assertEquals(
+                                        "default-test-group-4", 
response.object.getGroup(), "Group should match");
+                                logger.info("Received cluster update event 
#{}", clusterUpdateCount);
+                            }

Review Comment:
   In the test on line 239, `watch.hasNext()` is called inside a while loop to 
check if there are more events. However, if the stream is blocked waiting for 
data (which is expected for an SSE connection), `hasNext()` might block or 
return false when there's no data yet available. This could lead to the loop 
exiting prematurely or spinning without actually waiting for events. Consider 
using `watch.next()` directly which will block until an event arrives, or 
implement a proper timeout mechanism that doesn't rely on `hasNext()`.
   ```suggestion
                       SeataHttpWatch.Response<ClusterWatchEvent> response = 
watch.next();
                       if (response.type == 
SeataHttpWatch.Response.Type.UPDATE) {
                           Assertions.assertNotNull(response.object, "Event 
data should not be null");
   
                           if (!firstEventReceived) {
                               firstEventReceived = true;
                               logger.info("First event (connection 
established) received");
                           } else {
                               clusterUpdateCount++;
                               Assertions.assertEquals(
                                       "default-test-group-4", 
response.object.getGroup(), "Group should match");
                               logger.info("Received cluster update event #{}", 
clusterUpdateCount);
   ```



##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,31 +248,131 @@ private void sendWatcherResponse(Watcher<HttpContext> 
watcher, HttpResponseStatu
             } else {
                 ctx.writeAndFlush(response);
             }
-        } else {
-            // HTTP/2 response (h2c support)
-            // Send headers frame
+            return;
+        }
+
+        // For HTTP/2, headers must be sent first on the initial response
+        if (sendHeaders) {
             Http2Headers headers = new 
DefaultHttp2Headers().status(nettyStatus.codeAsText());
-            headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+            headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream; 
charset=utf-8");
+            headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
             ctx.write(new DefaultHttp2HeadersFrame(headers));
+        }
 
-            // Send empty data frame with endStream=true to close the stream
-            ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER, 
true))
-                    .addListener(f -> {
-                        if (!f.isSuccess()) {
-                            logger.warn("HTTP2 response send failed, 
group={}", group, f.cause());
-                        }
-                    });
+        String group = watcher.getGroup();
+        String eventData = buildEventData(group);
+        ByteBuf content = Unpooled.copiedBuffer(eventData, 
StandardCharsets.UTF_8);
+
+        // Send DATA frame (if closeStream is true, it will end the current 
stream)
+        ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+        ctx.flush();
+    }
+
+    /**
+     * Get current cluster metadata for the given group.
+     * This method extracts the logic from ClusterController#cluster to avoid 
circular dependency.
+     *
+     * @param group the group name
+     * @return the MetadataResponse containing current cluster metadata
+     */
+    public MetadataResponse getMetadataResponse(String group) {
+        MetadataResponse metadataResponse = new MetadataResponse();
+        if (StringUtils.isBlank(group)) {
+            group = ConfigurationFactory.getInstance()
+                    .getConfig(ConfigurationKeys.SERVER_RAFT_GROUP, 
DEFAULT_SEATA_GROUP);
+        }
+        RaftServer raftServer = RaftServerManager.getRaftServer(group);
+        if (raftServer != null) {
+            String mode = 
ConfigurationFactory.getInstance().getConfig(STORE_MODE);
+            metadataResponse.setStoreMode(mode);
+            RouteTable routeTable = RouteTable.getInstance();
+            try {
+                
routeTable.refreshLeader(RaftServerManager.getCliClientServiceInstance(), 
group, 1000);
+                PeerId leader = routeTable.selectLeader(group);
+                if (leader != null) {
+                    Set<Node> nodes = new HashSet<>();
+                    RaftClusterMetadata raftClusterMetadata =
+                            
raftServer.getRaftStateMachine().getRaftLeaderMetadata();
+                    Node leaderNode = raftServer
+                            .getRaftStateMachine()
+                            .getRaftLeaderMetadata()
+                            .getLeader();
+                    leaderNode.setGroup(group);
+                    nodes.add(leaderNode);
+                    nodes.addAll(raftClusterMetadata.getLearner());
+                    nodes.addAll(raftClusterMetadata.getFollowers());
+                    metadataResponse.setTerm(raftClusterMetadata.getTerm());
+                    metadataResponse.setNodes(new ArrayList<>(nodes));
+                }
+            } catch (Exception e) {
+                logger.error("Failed to get cluster metadata for group {}: 
{}", group, e.getMessage(), e);
+            }
+        }
+        return metadataResponse;
+    }
+
+    /**
+     * Build event data with simplified format: only group, timestamp, and 
metadata.
+     * For HTTP/2 connections, this will send the full MetadataResponse.
+     *
+     * @param group the group name
+     * @return the event data string with prefix
+     */
+    private String buildEventData(String group) {
+        try {
+            // Get current MetadataResponse
+            MetadataResponse metadataResponse = getMetadataResponse(group);
+
+            // Build simplified JSON: only group, timestamp, and metadata
+            String json = String.format(
+                    "{\"group\":\"%s\",\"timestamp\":%d,\"metadata\":%s}",
+                    group,
+                    System.currentTimeMillis(),
+                    OBJECT_MAPPER.writeValueAsString(metadataResponse));
+
+            logger.debug("Sending watch event: group={}, term={}", group, 
metadataResponse.getTerm());
+            return Constants.WATCH_EVENT_PREFIX + json + "\n";

Review Comment:
   The `buildEventData` method on line 328 uses `String.format` with `%s` to 
embed the JSON-serialized metadata directly into another JSON string. If the 
`group` parameter contains special characters like quotes or backslashes, this 
could result in malformed JSON. Consider properly escaping the `group` value or 
using a JSON builder/library to construct the entire JSON object safely rather 
than string formatting.



##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,31 +248,131 @@ private void sendWatcherResponse(Watcher<HttpContext> 
watcher, HttpResponseStatu
             } else {
                 ctx.writeAndFlush(response);
             }
-        } else {
-            // HTTP/2 response (h2c support)
-            // Send headers frame
+            return;
+        }
+
+        // For HTTP/2, headers must be sent first on the initial response
+        if (sendHeaders) {
             Http2Headers headers = new 
DefaultHttp2Headers().status(nettyStatus.codeAsText());
-            headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+            headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream; 
charset=utf-8");
+            headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
             ctx.write(new DefaultHttp2HeadersFrame(headers));
+        }
 
-            // Send empty data frame with endStream=true to close the stream
-            ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER, 
true))
-                    .addListener(f -> {
-                        if (!f.isSuccess()) {
-                            logger.warn("HTTP2 response send failed, 
group={}", group, f.cause());
-                        }
-                    });
+        String group = watcher.getGroup();
+        String eventData = buildEventData(group);
+        ByteBuf content = Unpooled.copiedBuffer(eventData, 
StandardCharsets.UTF_8);
+
+        // Send DATA frame (if closeStream is true, it will end the current 
stream)
+        ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+        ctx.flush();

Review Comment:
   For HTTP/2 responses, the method sends event data by calling 
`buildEventData(group)` on line 264, which internally calls 
`getMetadataResponse(group)`. This operation involves Raft metadata lookups and 
could potentially fail or be expensive. If this fails or throws an exception 
after headers have been sent (line 260), the stream will be in an inconsistent 
state. Consider adding error handling to ensure that if `buildEventData` fails, 
an appropriate error response or stream closure is performed.
   ```suggestion
           try {
               String eventData = buildEventData(group);
               ByteBuf content = Unpooled.copiedBuffer(eventData, 
StandardCharsets.UTF_8);
   
               // Send DATA frame (if closeStream is true, it will end the 
current stream)
               ctx.write(new DefaultHttp2DataFrame(content, closeStream));
               ctx.flush();
           } catch (Exception e) {
               logger.error("Failed to build or send event data for watcher on 
group {}", group, e);
               // Attempt to send an error event and close the stream to avoid 
leaving it in an inconsistent state
               try {
                   String errorData = "event: error\ndata: internal server 
error\n\n";
                   ByteBuf errorContent = Unpooled.copiedBuffer(errorData, 
StandardCharsets.UTF_8);
                   ctx.write(new DefaultHttp2DataFrame(errorContent, true));
                   ctx.flush();
               } catch (Exception inner) {
                   logger.error("Failed to send error response for watcher on 
group {}", group, inner);
               }
           }
   ```



##########
common/src/main/java/org/apache/seata/common/metadata/ClusterWatchEvent.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.metadata;
+
+/**
+ * Cluster watch event data class.
+ * Simplified format: only contains group, timestamp, and full metadata.
+ * 
+ * <p>Event format:
+ * <pre>
+ * 
{"group":"default","timestamp":1234567890,"metadata":{"nodes":[...],"storeMode":"raft","term":2}}
+ * </pre>
+ * 
+ * <p>Client can determine if update is needed by comparing metadata.term with 
local term.
+ * No need for separate event type field since all events contain full 
metadata.
+ *
+ * @see org.apache.seata.common.util.SeataHttpWatch
+ */
+public class ClusterWatchEvent {
+
+    private String group;
+
+    private Long timestamp;
+
+    private MetadataResponse metadata;
+
+    public ClusterWatchEvent() {}
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public MetadataResponse getMetadata() {
+        return metadata;
+    }
+
+    public void setMetadata(MetadataResponse metadata) {
+        this.metadata = metadata;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterWatchEvent{" + "group='"
+                + group + '\'' + ", timestamp="
+                + timestamp + ", metadata="
+                + metadata + '}';

Review Comment:
   Default toString(): MetadataResponse inherits toString() from Object, and so 
is not suitable for printing.
   ```suggestion
           String metadataString = metadata == null
                   ? "null"
                   : "MetadataResponse{term=" + metadata.getTerm() + "}";
           return "ClusterWatchEvent{" + "group='"
                   + group + '\'' + ", timestamp="
                   + timestamp + ", metadata="
                   + metadataString + '}';
   ```



##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,31 +248,131 @@ private void sendWatcherResponse(Watcher<HttpContext> 
watcher, HttpResponseStatu
             } else {
                 ctx.writeAndFlush(response);
             }
-        } else {
-            // HTTP/2 response (h2c support)
-            // Send headers frame
+            return;
+        }
+
+        // For HTTP/2, headers must be sent first on the initial response
+        if (sendHeaders) {
             Http2Headers headers = new 
DefaultHttp2Headers().status(nettyStatus.codeAsText());
-            headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+            headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream; 
charset=utf-8");
+            headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
             ctx.write(new DefaultHttp2HeadersFrame(headers));
+        }
 
-            // Send empty data frame with endStream=true to close the stream
-            ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER, 
true))
-                    .addListener(f -> {
-                        if (!f.isSuccess()) {
-                            logger.warn("HTTP2 response send failed, 
group={}", group, f.cause());
-                        }
-                    });
+        String group = watcher.getGroup();
+        String eventData = buildEventData(group);
+        ByteBuf content = Unpooled.copiedBuffer(eventData, 
StandardCharsets.UTF_8);
+
+        // Send DATA frame (if closeStream is true, it will end the current 
stream)
+        ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+        ctx.flush();
+    }
+
+    /**
+     * Get current cluster metadata for the given group.
+     * This method extracts the logic from ClusterController#cluster to avoid 
circular dependency.
+     *
+     * @param group the group name
+     * @return the MetadataResponse containing current cluster metadata
+     */
+    public MetadataResponse getMetadataResponse(String group) {
+        MetadataResponse metadataResponse = new MetadataResponse();
+        if (StringUtils.isBlank(group)) {
+            group = ConfigurationFactory.getInstance()
+                    .getConfig(ConfigurationKeys.SERVER_RAFT_GROUP, 
DEFAULT_SEATA_GROUP);
+        }
+        RaftServer raftServer = RaftServerManager.getRaftServer(group);
+        if (raftServer != null) {
+            String mode = 
ConfigurationFactory.getInstance().getConfig(STORE_MODE);
+            metadataResponse.setStoreMode(mode);
+            RouteTable routeTable = RouteTable.getInstance();
+            try {
+                
routeTable.refreshLeader(RaftServerManager.getCliClientServiceInstance(), 
group, 1000);
+                PeerId leader = routeTable.selectLeader(group);
+                if (leader != null) {
+                    Set<Node> nodes = new HashSet<>();
+                    RaftClusterMetadata raftClusterMetadata =
+                            
raftServer.getRaftStateMachine().getRaftLeaderMetadata();
+                    Node leaderNode = raftServer
+                            .getRaftStateMachine()
+                            .getRaftLeaderMetadata()
+                            .getLeader();
+                    leaderNode.setGroup(group);
+                    nodes.add(leaderNode);
+                    nodes.addAll(raftClusterMetadata.getLearner());
+                    nodes.addAll(raftClusterMetadata.getFollowers());
+                    metadataResponse.setTerm(raftClusterMetadata.getTerm());
+                    metadataResponse.setNodes(new ArrayList<>(nodes));
+                }
+            } catch (Exception e) {
+                logger.error("Failed to get cluster metadata for group {}: 
{}", group, e.getMessage(), e);
+            }
+        }
+        return metadataResponse;
+    }
+
+    /**
+     * Build event data with simplified format: only group, timestamp, and 
metadata.
+     * For HTTP/2 connections, this will send the full MetadataResponse.
+     *
+     * @param group the group name
+     * @return the event data string with prefix
+     */
+    private String buildEventData(String group) {
+        try {
+            // Get current MetadataResponse
+            MetadataResponse metadataResponse = getMetadataResponse(group);
+
+            // Build simplified JSON: only group, timestamp, and metadata
+            String json = String.format(
+                    "{\"group\":\"%s\",\"timestamp\":%d,\"metadata\":%s}",
+                    group,
+                    System.currentTimeMillis(),
+                    OBJECT_MAPPER.writeValueAsString(metadataResponse));
+
+            logger.debug("Sending watch event: group={}, term={}", group, 
metadataResponse.getTerm());
+            return Constants.WATCH_EVENT_PREFIX + json + "\n";
+        } catch (JsonProcessingException e) {
+            logger.error("Failed to serialize MetadataResponse for group {}: 
{}", group, e.getMessage(), e);
+            // Fallback: send minimal data
+            String json = String.format(
+                    "{\"group\":\"%s\",\"timestamp\":%d,\"metadata\":null}",
+                    group, System.currentTimeMillis());
+            return Constants.WATCH_EVENT_PREFIX + json + "\n";
         }
     }
 
     public void registryWatcher(Watcher<HttpContext> watcher) {
         String group = watcher.getGroup();
         Long term = GROUP_UPDATE_TERM.get(group);
-        if (term == null || watcher.getTerm() >= term) {
-            WATCHERS.computeIfAbsent(group, value -> new 
ConcurrentLinkedQueue<>())
+        HttpContext context = watcher.getAsyncContext();
+        boolean isHttp2 = context.isHttp2();
+
+        // For HTTP/2, must send response headers immediately, cannot delay
+        if (isHttp2 && !HTTP2_HEADERS_SENT.getOrDefault(watcher, false)) {
+            sendWatcherResponse(watcher, HttpResponseStatus.OK, false, true);
+            HTTP2_HEADERS_SENT.put(watcher, true);
+        }
+
+        if (isHttp2) {
+            HTTP2_WATCHERS
+                    .computeIfAbsent(group, value -> new 
ConcurrentLinkedQueue<>())
                     .add(watcher);
+
+            // If term has been updated, notify immediately
+            if (term != null && term > watcher.getTerm()) {
+                notifyWatcher(watcher, null);

Review Comment:
   There's a potential race condition between lines 353-356 in 
`registryWatcher` and lines 200-204 in `notifyWatcher`. If `registryWatcher` is 
called for an HTTP/2 watcher and simultaneously `notifyWatcher` is called (line 
365), both might check `HTTP2_HEADERS_SENT.getOrDefault(watcher, false)` and 
find it false, causing headers to potentially be sent twice. While 
ConcurrentHashMap prevents data corruption, the logical race could result in 
duplicate header frames being sent. Consider using `computeIfAbsent` or 
synchronized blocks to ensure atomicity of the check-and-set operation.



##########
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.ResponseBody;
+import okio.BufferedSource;
+import org.apache.seata.common.Constants;
+import org.apache.seata.common.exception.FrameworkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Seata HTTP/2 Watch implementation.
+ * Consumes server-pushed event stream via an iterator-style API.
+ * Each line contains a single event in "data: {json}" format, similar to 
Kubernetes watch API.
+ *
+ * @param <T> event data type
+ */
+public class SeataHttpWatch<T>
+        implements Iterator<SeataHttpWatch.Response<T>>, 
Iterable<SeataHttpWatch.Response<T>>, AutoCloseable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SeataHttpWatch.class);
+
+    private final ResponseBody responseBody;
+    private final BufferedSource source;
+    private final Call call;
+    private final ObjectMapper objectMapper;
+    private final Class<T> eventType;
+
+    /**
+     * Response wrapper containing event data.
+     * Since event format is simplified (no type field), all successful events 
are treated the same.
+     * Client can determine the nature of the event by comparing metadata.term.
+     */
+    public static class Response<T> {
+        /**
+         * Response type enum
+         */
+        public enum Type {
+            /**
+             * Normal event with data (connection established or cluster 
changed)
+             */
+            UPDATE,
+            /**
+             * Error event (parse error or connection error)
+             */
+            ERROR
+        }
+
+        public final Type type;
+
+        public final T object;
+
+        public Response(Type type, T object) {
+            this.type = type;
+            this.object = object;
+        }
+    }
+
+    /**
+     * Create a Watch instance from an OkHttp call
+     *
+     * @param call      the prepared HTTP call
+     * @param eventType the class type for deserializing event data
+     * @param <T>       the event data type
+     * @return a Watch instance
+     * @throws IOException if the request fails
+     */
+    public static <T> SeataHttpWatch<T> createWatch(Call call, Class<T> 
eventType) throws IOException {
+
+        okhttp3.Response response = call.execute();
+
+        if (!response.isSuccessful()) {
+            String respBody = null;
+            try (ResponseBody body = response.body()) {
+                if (body != null) {
+                    respBody = body.string();
+                }
+            } catch (IOException e) {
+                throw new FrameworkException(e, "Watch request failed: " + 
response.message());
+            }
+            throw new FrameworkException(
+                    String.format("Watch request failed with code %d: %s", 
response.code(), respBody));
+        }
+
+        // Verify Content-Type is event stream
+        String contentType = response.header("Content-Type");
+        if (contentType == null || !contentType.contains("text/event-stream")) 
{
+            LOGGER.warn("Expected Content-Type: text/event-stream, got: {}", 
contentType);
+        }
+
+        return new SeataHttpWatch<>(response.body(), call, eventType);

Review Comment:
   The `createWatch` method on line 90 executes the HTTP call and creates a 
SeataHttpWatch instance with the response body. However, if an exception occurs 
after line 92 but before line 113, the response object is never closed, leading 
to a resource leak. The response should be closed in a finally block or using 
try-with-resources to ensure proper cleanup, especially since the 
response.body() is passed to the SeataHttpWatch constructor.
   ```suggestion
           boolean watchCreated = false;
           try {
   
               if (!response.isSuccessful()) {
                   String respBody = null;
                   try (ResponseBody body = response.body()) {
                       if (body != null) {
                           respBody = body.string();
                       }
                   } catch (IOException e) {
                       throw new FrameworkException(e, "Watch request failed: " 
+ response.message());
                   }
                   throw new FrameworkException(
                           String.format("Watch request failed with code %d: 
%s", response.code(), respBody));
               }
   
               // Verify Content-Type is event stream
               String contentType = response.header("Content-Type");
               if (contentType == null || 
!contentType.contains("text/event-stream")) {
                   LOGGER.warn("Expected Content-Type: text/event-stream, got: 
{}", contentType);
               }
   
               SeataHttpWatch<T> watch = new SeataHttpWatch<>(response.body(), 
call, eventType);
               watchCreated = true;
               return watch;
           } finally {
               if (!watchCreated) {
                   ResponseBody body = response.body();
                   if (body != null) {
                       body.close();
                   } else {
                       response.close();
                   }
               }
           }
   ```



##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -89,36 +141,104 @@ public void init() {
     @Async
     public void onChangeEvent(ClusterChangeEvent event) {
         if (event.getTerm() > 0) {
-            GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
-            // Notifications are made of changes in cluster information
-            Optional.ofNullable(WATCHERS.remove(event.getGroup()))
-                    .ifPresent(watchers -> 
watchers.parallelStream().forEach(this::notifyWatcher));
+            String group = event.getGroup();
+            Long eventTerm = event.getTerm();
+
+            Long currentTerm = GROUP_UPDATE_TERM.get(group);
+            if (currentTerm != null && eventTerm <= currentTerm) {
+                logger.info(
+                        "Discarding outdated event with term {} for group {}, 
current term is {}",
+                        eventTerm,
+                        group,
+                        currentTerm);
+                return;
+            }
+
+            GROUP_UPDATE_TERM.put(group, eventTerm);
+
+            // Handle HTTP/1.1 watchers: remove and notify (one-time request)
+            Optional.ofNullable(HTTP1_WATCHERS.remove(group))
+                    .ifPresent(watchers -> 
watchers.parallelStream().forEach(watcher -> {
+                        notifyWatcher(watcher, eventTerm);
+                        // HTTP/1.1 watcher is done after notification
+                        watcher.setDone(true);
+                    }));
+
+            // Handle HTTP/2 watchers: notify without removing (long-lived 
connection)
+            Queue<Watcher<HttpContext>> http2Watchers = 
HTTP2_WATCHERS.get(group);
+            if (http2Watchers != null && !http2Watchers.isEmpty()) {
+                List<Watcher<HttpContext>> watchersToNotify = new 
ArrayList<>(http2Watchers);
+                watchersToNotify.forEach(watcher -> {
+                    if 
(watcher.getAsyncContext().getContext().channel().isActive() && 
!watcher.isDone()) {
+                        if (eventTerm > watcher.getTerm()) {
+                            notifyWatcher(watcher, eventTerm);
+                        } else {
+                            logger.info(
+                                    "Skipping notification for watcher with 
term {} >= event term {} for group {}",
+                                    watcher.getTerm(),
+                                    eventTerm,
+                                    group);
+                        }

Review Comment:
   The logger.info statement on lines 176-180 uses a positional argument format 
but the condition on line 173 compares `eventTerm > watcher.getTerm()`, meaning 
this log will only execute when the condition is false (i.e., when `eventTerm 
<= watcher.getTerm()`). However, the log message says "watcher with term {} >= 
event term {}", which would be confusing. The message should be adjusted to 
clarify it's logging when the notification is skipped because the watcher 
already has an equal or newer term.



##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +215,101 @@ private static String buildUrlWithParams(String url, 
Map<String, String> params)
         return urlBuilder.toString();
     }
 
-    private static void executeAsync(OkHttpClient client, Request request, 
final HttpCallback<Response> callback) {
-        client.newCall(request).enqueue(new Callback() {
-            @Override
-            public void onResponse(Call call, Response response) {
-                try {
-                    callback.onSuccess(response);
-                } finally {
-                    response.close();
-                }
-            }
+    /**
+     * Create an HTTP/2 client for watch connections.
+     * This client is configured for long-lived connections to receive 
Server-Sent Events (SSE).
+     * The client instances are cached and reused based on the connection 
timeout to improve performance.
+     *
+     * @param connectTimeoutSeconds connection timeout in seconds (fast 
failure if server is unreachable)
+     * @return configured OkHttpClient instance (cached and reused)
+     */
+    private static OkHttpClient createHttp2WatchClient(int 
connectTimeoutSeconds) {
+        return HTTP2_CLIENT_MAP.computeIfAbsent(connectTimeoutSeconds, k -> 
new OkHttpClient.Builder()
+                
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+                // Fast failure during connection phase
+                .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+                // Infinite read timeout to allow continuous listening for 
server push
+                .readTimeout(0, TimeUnit.SECONDS)
+                .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+                .build());

Review Comment:
   The HTTP2_CLIENT_MAP uses `connectTimeoutSeconds` as the key (line 227), but 
the only place this is called is with the constant 
`HTTP2_WATCH_CONNECT_TIMEOUT_SECONDS` which is 30 seconds. This means there 
will only ever be one entry in this map. Consider whether the caching logic 
with a map is necessary, or if a simple static field would be more appropriate. 
The comment on line 221 mentions "cached and reused based on the connection 
timeout" but in practice, the timeout never varies.
   ```suggestion
        * Singleton HTTP/2 client for watch connections.
        * This client is configured for long-lived connections to receive 
Server-Sent Events (SSE)
        * and is cached and reused to avoid repeatedly creating identical 
clients.
        */
       private static volatile OkHttpClient HTTP2_WATCH_CLIENT;
   
       /**
        * Create (on first use) or retrieve the HTTP/2 client for watch 
connections.
        * <p>
        * The first invocation determines the connection timeout used to 
configure the client; subsequent
        * calls return the same cached instance, regardless of the timeout 
value passed in.
        *
        * @param connectTimeoutSeconds connection timeout in seconds (fast 
failure if server is unreachable)
        * @return configured OkHttpClient instance (cached and reused)
        */
       private static OkHttpClient createHttp2WatchClient(int 
connectTimeoutSeconds) {
           OkHttpClient client = HTTP2_WATCH_CLIENT;
           if (client == null) {
               synchronized (HttpClientUtil.class) {
                   client = HTTP2_WATCH_CLIENT;
                   if (client == null) {
                       client = new OkHttpClient.Builder()
                               
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
                               // Fast failure during connection phase
                               .connectTimeout(connectTimeoutSeconds, 
TimeUnit.SECONDS)
                               // Infinite read timeout to allow continuous 
listening for server push
                               .readTimeout(0, TimeUnit.SECONDS)
                               .writeTimeout(connectTimeoutSeconds, 
TimeUnit.SECONDS)
                               .build();
                       HTTP2_WATCH_CLIENT = client;
                   }
               }
           }
           return client;
   ```



##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -89,36 +141,104 @@ public void init() {
     @Async
     public void onChangeEvent(ClusterChangeEvent event) {
         if (event.getTerm() > 0) {
-            GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
-            // Notifications are made of changes in cluster information
-            Optional.ofNullable(WATCHERS.remove(event.getGroup()))
-                    .ifPresent(watchers -> 
watchers.parallelStream().forEach(this::notifyWatcher));
+            String group = event.getGroup();
+            Long eventTerm = event.getTerm();

Review Comment:
   The variable 'eventTerm' is only assigned values of primitive type and is 
never 'null', but it is declared with the boxed type 'Long'.
   ```suggestion
               long eventTerm = event.getTerm();
   ```



##########
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.ResponseBody;
+import okio.BufferedSource;
+import org.apache.seata.common.Constants;
+import org.apache.seata.common.exception.FrameworkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Seata HTTP/2 Watch implementation.
+ * Consumes server-pushed event stream via an iterator-style API.
+ * Each line contains a single event in "data: {json}" format, similar to 
Kubernetes watch API.
+ *
+ * @param <T> event data type
+ */
+public class SeataHttpWatch<T>
+        implements Iterator<SeataHttpWatch.Response<T>>, 
Iterable<SeataHttpWatch.Response<T>>, AutoCloseable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SeataHttpWatch.class);
+
+    private final ResponseBody responseBody;
+    private final BufferedSource source;
+    private final Call call;
+    private final ObjectMapper objectMapper;
+    private final Class<T> eventType;
+
+    /**
+     * Response wrapper containing event data.
+     * Since event format is simplified (no type field), all successful events 
are treated the same.
+     * Client can determine the nature of the event by comparing metadata.term.
+     */
+    public static class Response<T> {
+        /**
+         * Response type enum
+         */
+        public enum Type {
+            /**
+             * Normal event with data (connection established or cluster 
changed)
+             */
+            UPDATE,
+            /**
+             * Error event (parse error or connection error)
+             */
+            ERROR
+        }
+
+        public final Type type;
+
+        public final T object;
+
+        public Response(Type type, T object) {
+            this.type = type;
+            this.object = object;
+        }
+    }
+
+    /**
+     * Create a Watch instance from an OkHttp call
+     *
+     * @param call      the prepared HTTP call
+     * @param eventType the class type for deserializing event data
+     * @param <T>       the event data type
+     * @return a Watch instance
+     * @throws IOException if the request fails
+     */
+    public static <T> SeataHttpWatch<T> createWatch(Call call, Class<T> 
eventType) throws IOException {
+
+        okhttp3.Response response = call.execute();
+
+        if (!response.isSuccessful()) {
+            String respBody = null;
+            try (ResponseBody body = response.body()) {
+                if (body != null) {
+                    respBody = body.string();
+                }
+            } catch (IOException e) {
+                throw new FrameworkException(e, "Watch request failed: " + 
response.message());
+            }
+            throw new FrameworkException(
+                    String.format("Watch request failed with code %d: %s", 
response.code(), respBody));
+        }
+
+        // Verify Content-Type is event stream
+        String contentType = response.header("Content-Type");
+        if (contentType == null || !contentType.contains("text/event-stream")) 
{
+            LOGGER.warn("Expected Content-Type: text/event-stream, got: {}", 
contentType);
+        }
+
+        return new SeataHttpWatch<>(response.body(), call, eventType);
+    }
+
+    /**
+     * Create a Watch instance with a prepared request
+     *
+     * @param client    the OkHttpClient instance
+     * @param request   the HTTP request
+     * @param eventType the class type for deserializing event data
+     * @param <T>       the event data type
+     * @return a Watch instance
+     * @throws IOException if the request fails
+     */
+    public static <T> SeataHttpWatch<T> createWatch(OkHttpClient client, 
Request request, Class<T> eventType)
+            throws IOException {
+
+        Call call = client.newCall(request);
+        return createWatch(call, eventType);
+    }
+
+    private SeataHttpWatch(ResponseBody responseBody, Call call, Class<T> 
eventType) {
+        this.responseBody = responseBody;
+        this.source = responseBody.source();
+        this.call = call;
+        this.objectMapper = new ObjectMapper();
+        this.eventType = eventType;
+    }
+
+    @Override
+    public boolean hasNext() {
+        try {
+            // Check if source is exhausted (stream closed)
+            return !source.exhausted();
+        } catch (IOException e) {
+            LOGGER.error("Error checking if stream has more data", e);
+            return false;
+        }
+    }
+
+    @Override
+    public Response<T> next() {
+        try {
+            /*
+            Read a single line and parse it as an event.
+            Format: "{prefix}{json}\n" where prefix is defined in 
Constants.WATCH_EVENT_PREFIX.
+            Each line is a complete event, event type is included in the JSON 
data.
+            */
+            String line = source.readUtf8Line();
+            if (line == null) {
+                throw new RuntimeException("Stream closed unexpectedly");
+            }
+
+            if (!line.startsWith(Constants.WATCH_EVENT_PREFIX)) {
+                throw new RuntimeException("Invalid event format: expected 
prefix '" + Constants.WATCH_EVENT_PREFIX
+                        + "', got: " + (line.length() > 20 ? line.substring(0, 
20) + "..." : line));
+            }
+
+            String jsonData = 
line.substring(Constants.WATCH_EVENT_PREFIX.length());
+            return parseEvent(jsonData);
+
+        } catch (IOException e) {
+            throw new RuntimeException("IO Exception during next()", e);
+        }
+    }
+
+    /**
+     * Parse event JSON into Response object.
+     * Simplified format: only contains group, timestamp, and metadata fields.
+     *
+     * @param json the JSON string to parse
+     * @return the parsed Response object
+     * @throws IOException if parsing fails
+     */
+    private Response<T> parseEvent(String json) throws IOException {
+        try {
+            T eventData = objectMapper.readValue(json, eventType);
+            return new Response<>(Response.Type.UPDATE, eventData);
+
+        } catch (Exception e) {
+            LOGGER.error("Failed to parse event JSON: {}", json, e);
+            // Return error response
+            return new Response<>(Response.Type.ERROR, null);
+        }

Review Comment:
   The `parseEvent` method catches a broad `Exception` on line 191, which 
includes both checked and unchecked exceptions. This could potentially catch 
and suppress serious errors like `OutOfMemoryError` or `StackOverflowError`. 
Consider catching only `JsonProcessingException` or `IOException` to handle 
parsing failures specifically, while allowing critical JVM errors to propagate.



##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -89,36 +141,104 @@ public void init() {
     @Async
     public void onChangeEvent(ClusterChangeEvent event) {
         if (event.getTerm() > 0) {
-            GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
-            // Notifications are made of changes in cluster information
-            Optional.ofNullable(WATCHERS.remove(event.getGroup()))
-                    .ifPresent(watchers -> 
watchers.parallelStream().forEach(this::notifyWatcher));
+            String group = event.getGroup();
+            Long eventTerm = event.getTerm();
+
+            Long currentTerm = GROUP_UPDATE_TERM.get(group);
+            if (currentTerm != null && eventTerm <= currentTerm) {
+                logger.info(
+                        "Discarding outdated event with term {} for group {}, 
current term is {}",
+                        eventTerm,
+                        group,
+                        currentTerm);
+                return;
+            }
+
+            GROUP_UPDATE_TERM.put(group, eventTerm);
+
+            // Handle HTTP/1.1 watchers: remove and notify (one-time request)
+            Optional.ofNullable(HTTP1_WATCHERS.remove(group))
+                    .ifPresent(watchers -> 
watchers.parallelStream().forEach(watcher -> {
+                        notifyWatcher(watcher, eventTerm);
+                        // HTTP/1.1 watcher is done after notification
+                        watcher.setDone(true);
+                    }));
+
+            // Handle HTTP/2 watchers: notify without removing (long-lived 
connection)
+            Queue<Watcher<HttpContext>> http2Watchers = 
HTTP2_WATCHERS.get(group);
+            if (http2Watchers != null && !http2Watchers.isEmpty()) {
+                List<Watcher<HttpContext>> watchersToNotify = new 
ArrayList<>(http2Watchers);
+                watchersToNotify.forEach(watcher -> {
+                    if 
(watcher.getAsyncContext().getContext().channel().isActive() && 
!watcher.isDone()) {
+                        if (eventTerm > watcher.getTerm()) {
+                            notifyWatcher(watcher, eventTerm);
+                        } else {
+                            logger.info(
+                                    "Skipping notification for watcher with 
term {} >= event term {} for group {}",
+                                    watcher.getTerm(),
+                                    eventTerm,
+                                    group);
+                        }
+                    } else {
+                        // Remove inactive watcher
+                        http2Watchers.remove(watcher);
+                        HTTP2_HEADERS_SENT.remove(watcher);
+                    }
+                });

Review Comment:
   The iteration through `watchersToNotify` checks if watchers are active and 
not done, but on line 184, it attempts to remove from the original 
`http2Watchers` queue. Since `watchersToNotify` is a snapshot copy created on 
line 170, the `watcher` reference in the forEach on line 171 might not 
successfully remove the correct element from `http2Watchers` in all cases, 
especially if the queue has been modified concurrently. Consider tracking 
watchers to remove and performing a batch removal after the iteration, or using 
an iterator on the original queue with proper synchronization.



##########
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.ResponseBody;
+import okio.BufferedSource;
+import org.apache.seata.common.Constants;
+import org.apache.seata.common.exception.FrameworkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Seata HTTP/2 Watch implementation.
+ * Consumes server-pushed event stream via an iterator-style API.
+ * Each line contains a single event in "data: {json}" format, similar to 
Kubernetes watch API.
+ *
+ * @param <T> event data type
+ */
+public class SeataHttpWatch<T>
+        implements Iterator<SeataHttpWatch.Response<T>>, 
Iterable<SeataHttpWatch.Response<T>>, AutoCloseable {

Review Comment:
   This Iterable is its own Iterator, but does not guard against multiple 
iterations.
   ```suggestion
           implements Iterator<SeataHttpWatch.Response<T>>, AutoCloseable {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to