Copilot commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2735045941
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -89,36 +141,104 @@ public void init() {
@Async
public void onChangeEvent(ClusterChangeEvent event) {
if (event.getTerm() > 0) {
- GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
- // Notifications are made of changes in cluster information
- Optional.ofNullable(WATCHERS.remove(event.getGroup()))
- .ifPresent(watchers ->
watchers.parallelStream().forEach(this::notifyWatcher));
+ String group = event.getGroup();
+ Long eventTerm = event.getTerm();
+
+ Long currentTerm = GROUP_UPDATE_TERM.get(group);
+ if (currentTerm != null && eventTerm <= currentTerm) {
+ logger.info(
+ "Discarding outdated event with term {} for group {},
current term is {}",
+ eventTerm,
+ group,
+ currentTerm);
+ return;
+ }
+
+ GROUP_UPDATE_TERM.put(group, eventTerm);
+
+ // Handle HTTP/1.1 watchers: remove and notify (one-time request)
+ Optional.ofNullable(HTTP1_WATCHERS.remove(group))
+ .ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
+ notifyWatcher(watcher, eventTerm);
+ // HTTP/1.1 watcher is done after notification
+ watcher.setDone(true);
+ }));
+
+ // Handle HTTP/2 watchers: notify without removing (long-lived
connection)
+ Queue<Watcher<HttpContext>> http2Watchers =
HTTP2_WATCHERS.get(group);
+ if (http2Watchers != null && !http2Watchers.isEmpty()) {
+ List<Watcher<HttpContext>> watchersToNotify = new
ArrayList<>(http2Watchers);
+ watchersToNotify.forEach(watcher -> {
+ if
(watcher.getAsyncContext().getContext().channel().isActive() &&
!watcher.isDone()) {
+ if (eventTerm > watcher.getTerm()) {
+ notifyWatcher(watcher, eventTerm);
+ } else {
+ logger.info(
+ "Skipping notification for watcher with
term {} >= event term {} for group {}",
+ watcher.getTerm(),
+ eventTerm,
+ group);
+ }
+ } else {
+ // Remove inactive watcher
+ http2Watchers.remove(watcher);
+ HTTP2_HEADERS_SENT.remove(watcher);
+ }
+ });
+ }
}
}
- private void notifyWatcher(Watcher<HttpContext> watcher) {
- watcher.setDone(true);
- sendWatcherResponse(watcher, HttpResponseStatus.OK);
+ private void notifyWatcher(Watcher<HttpContext> watcher, Long eventTerm) {
+ HttpContext context = watcher.getAsyncContext();
+ boolean isHttp2 = context.isHttp2();
+
+ if (!isHttp2) {
+ watcher.setDone(true);
+ }
+
+ boolean isFirstResponse = !HTTP2_HEADERS_SENT.getOrDefault(watcher,
false);
+ sendWatcherResponse(watcher, HttpResponseStatus.OK, false,
isFirstResponse);
+ if (isFirstResponse && isHttp2) {
+ HTTP2_HEADERS_SENT.put(watcher, true);
+ }
+
+ if (eventTerm != null && eventTerm > watcher.getTerm()) {
+ watcher.setTerm(eventTerm);
+ }
}
+ /**
+ * Send watcher response to the client.
+ *
+ * @param watcher the watcher instance
+ * @param nettyStatus the HTTP status code
+ * @param closeStream whether to close the HTTP/2 stream (endStream=true)
+ * @param sendHeaders whether to send HTTP/2 headers frame (only needed
for first response)
+ */
+ private void sendWatcherResponse(
+ Watcher<HttpContext> watcher,
+ HttpResponseStatus nettyStatus,
+ boolean closeStream,
+ boolean sendHeaders) {
- private void sendWatcherResponse(Watcher<HttpContext> watcher,
HttpResponseStatus nettyStatus) {
- String group = watcher.getGroup();
HttpContext context = watcher.getAsyncContext();
if (!(context instanceof HttpContext)) {
logger.warn(
"Unsupported context type for watcher on group {}: {}",
- group,
+ watcher.getGroup(),
context != null ? context.getClass().getName() : "null");
return;
}
Review Comment:
The check on line 225 `if (!(context instanceof HttpContext))` is redundant
since `context` is already declared as type `HttpContext` on line 224. This
condition will always be false (unless there's some unexpected classloader
issue). Consider removing this check or clarifying what the intended validation
is.
##########
server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java:
##########
@@ -131,70 +129,152 @@ public void run() {
throw new RuntimeException(e);
}
((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
- .publishEvent(new ClusterChangeEvent(this,
"default-test", 2, true));
+ .publishEvent(new ClusterChangeEvent(this,
"default-test-group-2", 2, true));
}
});
thread.start();
try (Response response =
HttpClientUtil.doPost("http://127.0.0.1:" + port +
"/metadata/v1/watch", param, header, 30000)) {
if (response != null) {
- Assertions.assertEquals(HttpStatus.SC_OK, response.code());
+ Assertions.assertEquals(200, response.code());
return;
}
}
Assertions.fail();
}
+ /**
+ * Verification points:
+ * 1. Verify HTTP/2 data push continuity: client can continuously receive
events when server pushes them sequentially
+ * 2. Note: Cannot verify MetadataResponse due to inability to simulate
real term changes in test environment
+ */
@Test
- @Order(5)
- void watch_withHttp2() throws Exception {
- CountDownLatch latch = new CountDownLatch(1);
+ @Order(4)
+ void watchStream_http2() throws Exception {
+ Map<String, String> header = new HashMap<>();
+ header.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ Map<String, String> param = new HashMap<>();
+ param.put("default-test-group-3", "1");
+
+ // Trigger a cluster change event after connection is established
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
- Map<String, String> headers = new HashMap<>();
- headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .publishEvent(new ClusterChangeEvent(this,
"default-test-group-3", 2, true));
+ }
+ });
+ thread.start();
- Map<String, String> params = new HashMap<>();
- params.put("default-test", "1");
+ try (SeataHttpWatch<ClusterWatchEvent> watch =
HttpClientUtil.watchPost(
+ "http://127.0.0.1:" + port + "/metadata/v1/watch", param,
header, ClusterWatchEvent.class)) {
+
+ // Verify HTTP/2 data push continuity: receive first event
(connection established)
+ SeataHttpWatch.Response<ClusterWatchEvent> firstResponse =
watch.next();
+ Assertions.assertNotNull(firstResponse.object, "First event data
should not be null");
+ Assertions.assertEquals(SeataHttpWatch.Response.Type.UPDATE,
firstResponse.type, "First event should be UPDATE");
+
+ // Verify HTTP/2 data push continuity: receive second event
(cluster change event)
+ SeataHttpWatch.Response<ClusterWatchEvent> secondResponse =
watch.next();
+ Assertions.assertNotNull(secondResponse.object, "Second event data
should not be null");
+ Assertions.assertEquals(SeataHttpWatch.Response.Type.UPDATE,
secondResponse.type, "Second event should be UPDATE");
+ Assertions.assertEquals("default-test-group-3",
secondResponse.object.getGroup(), "Group should match");
+
+ logger.info("Successfully received two consecutive events from
server");
+ }
+ }
+
+ /**
+ * Verification points:
+ * 1. Verify HTTP/2 data push continuity: client can continuously receive
multiple events when server pushes them sequentially
+ * 2. Note: Cannot verify MetadataResponse due to inability to simulate
real term changes in test environment
+ */
+ @Test
+ @Order(5)
+ void watchMultipleClusterUpdates_http2() throws Exception {
+ Map<String, String> header = new HashMap<>();
+ header.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+ Map<String, String> param = new HashMap<>();
+ param.put("default-test-group-4", "1");
- Thread thread = new Thread(() -> {
+ // Trigger multiple cluster change events sequentially
+ Thread triggerThread = new Thread(() -> {
try {
- Thread.sleep(2000);
+ Thread.sleep(3000); // Wait for connection to be established
+
+ ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .publishEvent(new ClusterChangeEvent(this,
"default-test-group-4", 2, true));
+ Thread.sleep(1000);
+
+ ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .publishEvent(new ClusterChangeEvent(this,
"default-test-group-4", 3, true));
+ Thread.sleep(500);
+
+ ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .publishEvent(new ClusterChangeEvent(this,
"default-test-group-4", 4, true));
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ Thread.currentThread().interrupt();
}
- ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
- .publishEvent(new ClusterChangeEvent(this, "default-test",
2, true));
});
- thread.start();
+ triggerThread.start();
- HttpCallback<Response> callback = new HttpCallback<Response>() {
- @Override
- public void onSuccess(Response response) {
- Assertions.assertNotNull(response);
- Assertions.assertEquals(Protocol.H2_PRIOR_KNOWLEDGE,
response.protocol());
- Assertions.assertEquals(HttpStatus.SC_OK, response.code());
- latch.countDown();
- }
+ boolean firstEventReceived = false;
+ int clusterUpdateCount = 0;
+ long startTime = System.currentTimeMillis();
+ long maxWaitTime = 10000;
+ int expectedUpdateCount = 3;
- @Override
- public void onFailure(Throwable t) {
- Assertions.fail("Should not fail: " + t.getMessage());
- }
+ // Verify HTTP/2 data push continuity: continuously receive multiple
events
+ try (SeataHttpWatch<ClusterWatchEvent> watch =
HttpClientUtil.watchPost(
+ "http://127.0.0.1:" + port + "/metadata/v1/watch", param,
header, ClusterWatchEvent.class)) {
- @Override
- public void onCancelled() {
- Assertions.fail("Should not be cancelled");
+ while (System.currentTimeMillis() - startTime < maxWaitTime &&
clusterUpdateCount < expectedUpdateCount) {
+ try {
+ if (watch.hasNext()) {
+ SeataHttpWatch.Response<ClusterWatchEvent> response =
watch.next();
+ if (response.type ==
SeataHttpWatch.Response.Type.UPDATE) {
+ Assertions.assertNotNull(response.object, "Event
data should not be null");
+
+ if (!firstEventReceived) {
+ firstEventReceived = true;
+ logger.info("First event (connection
established) received");
+ } else {
+ clusterUpdateCount++;
+ Assertions.assertEquals(
+ "default-test-group-4",
response.object.getGroup(), "Group should match");
+ logger.info("Received cluster update event
#{}", clusterUpdateCount);
+ }
Review Comment:
In the test on line 239, `watch.hasNext()` is called inside a while loop to
check if there are more events. However, if the stream is blocked waiting for
data (which is expected for an SSE connection), `hasNext()` might block or
return false when there's no data yet available. This could lead to the loop
exiting prematurely or spinning without actually waiting for events. Consider
using `watch.next()` directly which will block until an event arrives, or
implement a proper timeout mechanism that doesn't rely on `hasNext()`.
```suggestion
SeataHttpWatch.Response<ClusterWatchEvent> response =
watch.next();
if (response.type ==
SeataHttpWatch.Response.Type.UPDATE) {
Assertions.assertNotNull(response.object, "Event
data should not be null");
if (!firstEventReceived) {
firstEventReceived = true;
logger.info("First event (connection
established) received");
} else {
clusterUpdateCount++;
Assertions.assertEquals(
"default-test-group-4",
response.object.getGroup(), "Group should match");
logger.info("Received cluster update event #{}",
clusterUpdateCount);
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,31 +248,131 @@ private void sendWatcherResponse(Watcher<HttpContext>
watcher, HttpResponseStatu
} else {
ctx.writeAndFlush(response);
}
- } else {
- // HTTP/2 response (h2c support)
- // Send headers frame
+ return;
+ }
+
+ // For HTTP/2, headers must be sent first on the initial response
+ if (sendHeaders) {
Http2Headers headers = new
DefaultHttp2Headers().status(nettyStatus.codeAsText());
- headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+ headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream;
charset=utf-8");
+ headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
ctx.write(new DefaultHttp2HeadersFrame(headers));
+ }
- // Send empty data frame with endStream=true to close the stream
- ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER,
true))
- .addListener(f -> {
- if (!f.isSuccess()) {
- logger.warn("HTTP2 response send failed,
group={}", group, f.cause());
- }
- });
+ String group = watcher.getGroup();
+ String eventData = buildEventData(group);
+ ByteBuf content = Unpooled.copiedBuffer(eventData,
StandardCharsets.UTF_8);
+
+ // Send DATA frame (if closeStream is true, it will end the current
stream)
+ ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+ ctx.flush();
+ }
+
+ /**
+ * Get current cluster metadata for the given group.
+ * This method extracts the logic from ClusterController#cluster to avoid
circular dependency.
+ *
+ * @param group the group name
+ * @return the MetadataResponse containing current cluster metadata
+ */
+ public MetadataResponse getMetadataResponse(String group) {
+ MetadataResponse metadataResponse = new MetadataResponse();
+ if (StringUtils.isBlank(group)) {
+ group = ConfigurationFactory.getInstance()
+ .getConfig(ConfigurationKeys.SERVER_RAFT_GROUP,
DEFAULT_SEATA_GROUP);
+ }
+ RaftServer raftServer = RaftServerManager.getRaftServer(group);
+ if (raftServer != null) {
+ String mode =
ConfigurationFactory.getInstance().getConfig(STORE_MODE);
+ metadataResponse.setStoreMode(mode);
+ RouteTable routeTable = RouteTable.getInstance();
+ try {
+
routeTable.refreshLeader(RaftServerManager.getCliClientServiceInstance(),
group, 1000);
+ PeerId leader = routeTable.selectLeader(group);
+ if (leader != null) {
+ Set<Node> nodes = new HashSet<>();
+ RaftClusterMetadata raftClusterMetadata =
+
raftServer.getRaftStateMachine().getRaftLeaderMetadata();
+ Node leaderNode = raftServer
+ .getRaftStateMachine()
+ .getRaftLeaderMetadata()
+ .getLeader();
+ leaderNode.setGroup(group);
+ nodes.add(leaderNode);
+ nodes.addAll(raftClusterMetadata.getLearner());
+ nodes.addAll(raftClusterMetadata.getFollowers());
+ metadataResponse.setTerm(raftClusterMetadata.getTerm());
+ metadataResponse.setNodes(new ArrayList<>(nodes));
+ }
+ } catch (Exception e) {
+ logger.error("Failed to get cluster metadata for group {}:
{}", group, e.getMessage(), e);
+ }
+ }
+ return metadataResponse;
+ }
+
+ /**
+ * Build event data with simplified format: only group, timestamp, and
metadata.
+ * For HTTP/2 connections, this will send the full MetadataResponse.
+ *
+ * @param group the group name
+ * @return the event data string with prefix
+ */
+ private String buildEventData(String group) {
+ try {
+ // Get current MetadataResponse
+ MetadataResponse metadataResponse = getMetadataResponse(group);
+
+ // Build simplified JSON: only group, timestamp, and metadata
+ String json = String.format(
+ "{\"group\":\"%s\",\"timestamp\":%d,\"metadata\":%s}",
+ group,
+ System.currentTimeMillis(),
+ OBJECT_MAPPER.writeValueAsString(metadataResponse));
+
+ logger.debug("Sending watch event: group={}, term={}", group,
metadataResponse.getTerm());
+ return Constants.WATCH_EVENT_PREFIX + json + "\n";
Review Comment:
The `buildEventData` method on line 328 uses `String.format` with `%s` to
embed the JSON-serialized metadata directly into another JSON string. If the
`group` parameter contains special characters like quotes or backslashes, this
could result in malformed JSON. Consider properly escaping the `group` value or
using a JSON builder/library to construct the entire JSON object safely rather
than string formatting.
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,31 +248,131 @@ private void sendWatcherResponse(Watcher<HttpContext>
watcher, HttpResponseStatu
} else {
ctx.writeAndFlush(response);
}
- } else {
- // HTTP/2 response (h2c support)
- // Send headers frame
+ return;
+ }
+
+ // For HTTP/2, headers must be sent first on the initial response
+ if (sendHeaders) {
Http2Headers headers = new
DefaultHttp2Headers().status(nettyStatus.codeAsText());
- headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+ headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream;
charset=utf-8");
+ headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
ctx.write(new DefaultHttp2HeadersFrame(headers));
+ }
- // Send empty data frame with endStream=true to close the stream
- ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER,
true))
- .addListener(f -> {
- if (!f.isSuccess()) {
- logger.warn("HTTP2 response send failed,
group={}", group, f.cause());
- }
- });
+ String group = watcher.getGroup();
+ String eventData = buildEventData(group);
+ ByteBuf content = Unpooled.copiedBuffer(eventData,
StandardCharsets.UTF_8);
+
+ // Send DATA frame (if closeStream is true, it will end the current
stream)
+ ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+ ctx.flush();
Review Comment:
For HTTP/2 responses, the method sends event data by calling
`buildEventData(group)` on line 264, which internally calls
`getMetadataResponse(group)`. This operation involves Raft metadata lookups and
could potentially fail or be expensive. If this fails or throws an exception
after headers have been sent (line 260), the stream will be in an inconsistent
state. Consider adding error handling to ensure that if `buildEventData` fails,
an appropriate error response or stream closure is performed.
```suggestion
try {
String eventData = buildEventData(group);
ByteBuf content = Unpooled.copiedBuffer(eventData,
StandardCharsets.UTF_8);
// Send DATA frame (if closeStream is true, it will end the
current stream)
ctx.write(new DefaultHttp2DataFrame(content, closeStream));
ctx.flush();
} catch (Exception e) {
logger.error("Failed to build or send event data for watcher on
group {}", group, e);
// Attempt to send an error event and close the stream to avoid
leaving it in an inconsistent state
try {
String errorData = "event: error\ndata: internal server
error\n\n";
ByteBuf errorContent = Unpooled.copiedBuffer(errorData,
StandardCharsets.UTF_8);
ctx.write(new DefaultHttp2DataFrame(errorContent, true));
ctx.flush();
} catch (Exception inner) {
logger.error("Failed to send error response for watcher on
group {}", group, inner);
}
}
```
##########
common/src/main/java/org/apache/seata/common/metadata/ClusterWatchEvent.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.metadata;
+
+/**
+ * Cluster watch event data class.
+ * Simplified format: only contains group, timestamp, and full metadata.
+ *
+ * <p>Event format:
+ * <pre>
+ *
{"group":"default","timestamp":1234567890,"metadata":{"nodes":[...],"storeMode":"raft","term":2}}
+ * </pre>
+ *
+ * <p>Client can determine if update is needed by comparing metadata.term with
local term.
+ * No need for separate event type field since all events contain full
metadata.
+ *
+ * @see org.apache.seata.common.util.SeataHttpWatch
+ */
+public class ClusterWatchEvent {
+
+ private String group;
+
+ private Long timestamp;
+
+ private MetadataResponse metadata;
+
+ public ClusterWatchEvent() {}
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public MetadataResponse getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(MetadataResponse metadata) {
+ this.metadata = metadata;
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterWatchEvent{" + "group='"
+ + group + '\'' + ", timestamp="
+ + timestamp + ", metadata="
+ + metadata + '}';
Review Comment:
Default toString(): MetadataResponse inherits toString() from Object, and so
is not suitable for printing.
```suggestion
String metadataString = metadata == null
? "null"
: "MetadataResponse{term=" + metadata.getTerm() + "}";
return "ClusterWatchEvent{" + "group='"
+ group + '\'' + ", timestamp="
+ timestamp + ", metadata="
+ metadataString + '}';
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,31 +248,131 @@ private void sendWatcherResponse(Watcher<HttpContext>
watcher, HttpResponseStatu
} else {
ctx.writeAndFlush(response);
}
- } else {
- // HTTP/2 response (h2c support)
- // Send headers frame
+ return;
+ }
+
+ // For HTTP/2, headers must be sent first on the initial response
+ if (sendHeaders) {
Http2Headers headers = new
DefaultHttp2Headers().status(nettyStatus.codeAsText());
- headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+ headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream;
charset=utf-8");
+ headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
ctx.write(new DefaultHttp2HeadersFrame(headers));
+ }
- // Send empty data frame with endStream=true to close the stream
- ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER,
true))
- .addListener(f -> {
- if (!f.isSuccess()) {
- logger.warn("HTTP2 response send failed,
group={}", group, f.cause());
- }
- });
+ String group = watcher.getGroup();
+ String eventData = buildEventData(group);
+ ByteBuf content = Unpooled.copiedBuffer(eventData,
StandardCharsets.UTF_8);
+
+ // Send DATA frame (if closeStream is true, it will end the current
stream)
+ ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+ ctx.flush();
+ }
+
+ /**
+ * Get current cluster metadata for the given group.
+ * This method extracts the logic from ClusterController#cluster to avoid
circular dependency.
+ *
+ * @param group the group name
+ * @return the MetadataResponse containing current cluster metadata
+ */
+ public MetadataResponse getMetadataResponse(String group) {
+ MetadataResponse metadataResponse = new MetadataResponse();
+ if (StringUtils.isBlank(group)) {
+ group = ConfigurationFactory.getInstance()
+ .getConfig(ConfigurationKeys.SERVER_RAFT_GROUP,
DEFAULT_SEATA_GROUP);
+ }
+ RaftServer raftServer = RaftServerManager.getRaftServer(group);
+ if (raftServer != null) {
+ String mode =
ConfigurationFactory.getInstance().getConfig(STORE_MODE);
+ metadataResponse.setStoreMode(mode);
+ RouteTable routeTable = RouteTable.getInstance();
+ try {
+
routeTable.refreshLeader(RaftServerManager.getCliClientServiceInstance(),
group, 1000);
+ PeerId leader = routeTable.selectLeader(group);
+ if (leader != null) {
+ Set<Node> nodes = new HashSet<>();
+ RaftClusterMetadata raftClusterMetadata =
+
raftServer.getRaftStateMachine().getRaftLeaderMetadata();
+ Node leaderNode = raftServer
+ .getRaftStateMachine()
+ .getRaftLeaderMetadata()
+ .getLeader();
+ leaderNode.setGroup(group);
+ nodes.add(leaderNode);
+ nodes.addAll(raftClusterMetadata.getLearner());
+ nodes.addAll(raftClusterMetadata.getFollowers());
+ metadataResponse.setTerm(raftClusterMetadata.getTerm());
+ metadataResponse.setNodes(new ArrayList<>(nodes));
+ }
+ } catch (Exception e) {
+ logger.error("Failed to get cluster metadata for group {}:
{}", group, e.getMessage(), e);
+ }
+ }
+ return metadataResponse;
+ }
+
+ /**
+ * Build event data with simplified format: only group, timestamp, and
metadata.
+ * For HTTP/2 connections, this will send the full MetadataResponse.
+ *
+ * @param group the group name
+ * @return the event data string with prefix
+ */
+ private String buildEventData(String group) {
+ try {
+ // Get current MetadataResponse
+ MetadataResponse metadataResponse = getMetadataResponse(group);
+
+ // Build simplified JSON: only group, timestamp, and metadata
+ String json = String.format(
+ "{\"group\":\"%s\",\"timestamp\":%d,\"metadata\":%s}",
+ group,
+ System.currentTimeMillis(),
+ OBJECT_MAPPER.writeValueAsString(metadataResponse));
+
+ logger.debug("Sending watch event: group={}, term={}", group,
metadataResponse.getTerm());
+ return Constants.WATCH_EVENT_PREFIX + json + "\n";
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to serialize MetadataResponse for group {}:
{}", group, e.getMessage(), e);
+ // Fallback: send minimal data
+ String json = String.format(
+ "{\"group\":\"%s\",\"timestamp\":%d,\"metadata\":null}",
+ group, System.currentTimeMillis());
+ return Constants.WATCH_EVENT_PREFIX + json + "\n";
}
}
public void registryWatcher(Watcher<HttpContext> watcher) {
String group = watcher.getGroup();
Long term = GROUP_UPDATE_TERM.get(group);
- if (term == null || watcher.getTerm() >= term) {
- WATCHERS.computeIfAbsent(group, value -> new
ConcurrentLinkedQueue<>())
+ HttpContext context = watcher.getAsyncContext();
+ boolean isHttp2 = context.isHttp2();
+
+ // For HTTP/2, must send response headers immediately, cannot delay
+ if (isHttp2 && !HTTP2_HEADERS_SENT.getOrDefault(watcher, false)) {
+ sendWatcherResponse(watcher, HttpResponseStatus.OK, false, true);
+ HTTP2_HEADERS_SENT.put(watcher, true);
+ }
+
+ if (isHttp2) {
+ HTTP2_WATCHERS
+ .computeIfAbsent(group, value -> new
ConcurrentLinkedQueue<>())
.add(watcher);
+
+ // If term has been updated, notify immediately
+ if (term != null && term > watcher.getTerm()) {
+ notifyWatcher(watcher, null);
Review Comment:
There's a potential race condition between lines 353-356 in
`registryWatcher` and lines 200-204 in `notifyWatcher`. If `registryWatcher` is
called for an HTTP/2 watcher and simultaneously `notifyWatcher` is called (line
365), both might check `HTTP2_HEADERS_SENT.getOrDefault(watcher, false)` and
find it false, causing headers to potentially be sent twice. While
ConcurrentHashMap prevents data corruption, the logical race could result in
duplicate header frames being sent. Consider using `computeIfAbsent` or
synchronized blocks to ensure atomicity of the check-and-set operation.
##########
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.ResponseBody;
+import okio.BufferedSource;
+import org.apache.seata.common.Constants;
+import org.apache.seata.common.exception.FrameworkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Seata HTTP/2 Watch implementation.
+ * Consumes server-pushed event stream via an iterator-style API.
+ * Each line contains a single event in "data: {json}" format, similar to
Kubernetes watch API.
+ *
+ * @param <T> event data type
+ */
+public class SeataHttpWatch<T>
+ implements Iterator<SeataHttpWatch.Response<T>>,
Iterable<SeataHttpWatch.Response<T>>, AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SeataHttpWatch.class);
+
+ private final ResponseBody responseBody;
+ private final BufferedSource source;
+ private final Call call;
+ private final ObjectMapper objectMapper;
+ private final Class<T> eventType;
+
+ /**
+ * Response wrapper containing event data.
+ * Since event format is simplified (no type field), all successful events
are treated the same.
+ * Client can determine the nature of the event by comparing metadata.term.
+ */
+ public static class Response<T> {
+ /**
+ * Response type enum
+ */
+ public enum Type {
+ /**
+ * Normal event with data (connection established or cluster
changed)
+ */
+ UPDATE,
+ /**
+ * Error event (parse error or connection error)
+ */
+ ERROR
+ }
+
+ public final Type type;
+
+ public final T object;
+
+ public Response(Type type, T object) {
+ this.type = type;
+ this.object = object;
+ }
+ }
+
+ /**
+ * Create a Watch instance from an OkHttp call
+ *
+ * @param call the prepared HTTP call
+ * @param eventType the class type for deserializing event data
+ * @param <T> the event data type
+ * @return a Watch instance
+ * @throws IOException if the request fails
+ */
+ public static <T> SeataHttpWatch<T> createWatch(Call call, Class<T>
eventType) throws IOException {
+
+ okhttp3.Response response = call.execute();
+
+ if (!response.isSuccessful()) {
+ String respBody = null;
+ try (ResponseBody body = response.body()) {
+ if (body != null) {
+ respBody = body.string();
+ }
+ } catch (IOException e) {
+ throw new FrameworkException(e, "Watch request failed: " +
response.message());
+ }
+ throw new FrameworkException(
+ String.format("Watch request failed with code %d: %s",
response.code(), respBody));
+ }
+
+ // Verify Content-Type is event stream
+ String contentType = response.header("Content-Type");
+ if (contentType == null || !contentType.contains("text/event-stream"))
{
+ LOGGER.warn("Expected Content-Type: text/event-stream, got: {}",
contentType);
+ }
+
+ return new SeataHttpWatch<>(response.body(), call, eventType);
Review Comment:
The `createWatch` method on line 90 executes the HTTP call and creates a
SeataHttpWatch instance with the response body. However, if an exception occurs
after line 92 but before line 113, the response object is never closed, leading
to a resource leak. The response should be closed in a finally block or using
try-with-resources to ensure proper cleanup, especially since the
response.body() is passed to the SeataHttpWatch constructor.
```suggestion
boolean watchCreated = false;
try {
if (!response.isSuccessful()) {
String respBody = null;
try (ResponseBody body = response.body()) {
if (body != null) {
respBody = body.string();
}
} catch (IOException e) {
throw new FrameworkException(e, "Watch request failed: "
+ response.message());
}
throw new FrameworkException(
String.format("Watch request failed with code %d:
%s", response.code(), respBody));
}
// Verify Content-Type is event stream
String contentType = response.header("Content-Type");
if (contentType == null ||
!contentType.contains("text/event-stream")) {
LOGGER.warn("Expected Content-Type: text/event-stream, got:
{}", contentType);
}
SeataHttpWatch<T> watch = new SeataHttpWatch<>(response.body(),
call, eventType);
watchCreated = true;
return watch;
} finally {
if (!watchCreated) {
ResponseBody body = response.body();
if (body != null) {
body.close();
} else {
response.close();
}
}
}
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -89,36 +141,104 @@ public void init() {
@Async
public void onChangeEvent(ClusterChangeEvent event) {
if (event.getTerm() > 0) {
- GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
- // Notifications are made of changes in cluster information
- Optional.ofNullable(WATCHERS.remove(event.getGroup()))
- .ifPresent(watchers ->
watchers.parallelStream().forEach(this::notifyWatcher));
+ String group = event.getGroup();
+ Long eventTerm = event.getTerm();
+
+ Long currentTerm = GROUP_UPDATE_TERM.get(group);
+ if (currentTerm != null && eventTerm <= currentTerm) {
+ logger.info(
+ "Discarding outdated event with term {} for group {},
current term is {}",
+ eventTerm,
+ group,
+ currentTerm);
+ return;
+ }
+
+ GROUP_UPDATE_TERM.put(group, eventTerm);
+
+ // Handle HTTP/1.1 watchers: remove and notify (one-time request)
+ Optional.ofNullable(HTTP1_WATCHERS.remove(group))
+ .ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
+ notifyWatcher(watcher, eventTerm);
+ // HTTP/1.1 watcher is done after notification
+ watcher.setDone(true);
+ }));
+
+ // Handle HTTP/2 watchers: notify without removing (long-lived
connection)
+ Queue<Watcher<HttpContext>> http2Watchers =
HTTP2_WATCHERS.get(group);
+ if (http2Watchers != null && !http2Watchers.isEmpty()) {
+ List<Watcher<HttpContext>> watchersToNotify = new
ArrayList<>(http2Watchers);
+ watchersToNotify.forEach(watcher -> {
+ if
(watcher.getAsyncContext().getContext().channel().isActive() &&
!watcher.isDone()) {
+ if (eventTerm > watcher.getTerm()) {
+ notifyWatcher(watcher, eventTerm);
+ } else {
+ logger.info(
+ "Skipping notification for watcher with
term {} >= event term {} for group {}",
+ watcher.getTerm(),
+ eventTerm,
+ group);
+ }
Review Comment:
The logger.info statement on lines 176-180 uses a positional argument format
but the condition on line 173 compares `eventTerm > watcher.getTerm()`, meaning
this log will only execute when the condition is false (i.e., when `eventTerm
<= watcher.getTerm()`). However, the log message says "watcher with term {} >=
event term {}", which would be confusing. The message should be adjusted to
clarify it's logging when the notification is skipped because the watcher
already has an equal or newer term.
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +215,101 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ /**
+ * Create an HTTP/2 client for watch connections.
+ * This client is configured for long-lived connections to receive
Server-Sent Events (SSE).
+ * The client instances are cached and reused based on the connection
timeout to improve performance.
+ *
+ * @param connectTimeoutSeconds connection timeout in seconds (fast
failure if server is unreachable)
+ * @return configured OkHttpClient instance (cached and reused)
+ */
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return HTTP2_CLIENT_MAP.computeIfAbsent(connectTimeoutSeconds, k ->
new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ // Fast failure during connection phase
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ // Infinite read timeout to allow continuous listening for
server push
+ .readTimeout(0, TimeUnit.SECONDS)
+ .writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ .build());
Review Comment:
The HTTP2_CLIENT_MAP uses `connectTimeoutSeconds` as the key (line 227), but
the only place this is called is with the constant
`HTTP2_WATCH_CONNECT_TIMEOUT_SECONDS` which is 30 seconds. This means there
will only ever be one entry in this map. Consider whether the caching logic
with a map is necessary, or if a simple static field would be more appropriate.
The comment on line 221 mentions "cached and reused based on the connection
timeout" but in practice, the timeout never varies.
```suggestion
* Singleton HTTP/2 client for watch connections.
* This client is configured for long-lived connections to receive
Server-Sent Events (SSE)
* and is cached and reused to avoid repeatedly creating identical
clients.
*/
private static volatile OkHttpClient HTTP2_WATCH_CLIENT;
/**
* Create (on first use) or retrieve the HTTP/2 client for watch
connections.
* <p>
* The first invocation determines the connection timeout used to
configure the client; subsequent
* calls return the same cached instance, regardless of the timeout
value passed in.
*
* @param connectTimeoutSeconds connection timeout in seconds (fast
failure if server is unreachable)
* @return configured OkHttpClient instance (cached and reused)
*/
private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
OkHttpClient client = HTTP2_WATCH_CLIENT;
if (client == null) {
synchronized (HttpClientUtil.class) {
client = HTTP2_WATCH_CLIENT;
if (client == null) {
client = new OkHttpClient.Builder()
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
// Fast failure during connection phase
.connectTimeout(connectTimeoutSeconds,
TimeUnit.SECONDS)
// Infinite read timeout to allow continuous
listening for server push
.readTimeout(0, TimeUnit.SECONDS)
.writeTimeout(connectTimeoutSeconds,
TimeUnit.SECONDS)
.build();
HTTP2_WATCH_CLIENT = client;
}
}
}
return client;
```
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -89,36 +141,104 @@ public void init() {
@Async
public void onChangeEvent(ClusterChangeEvent event) {
if (event.getTerm() > 0) {
- GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
- // Notifications are made of changes in cluster information
- Optional.ofNullable(WATCHERS.remove(event.getGroup()))
- .ifPresent(watchers ->
watchers.parallelStream().forEach(this::notifyWatcher));
+ String group = event.getGroup();
+ Long eventTerm = event.getTerm();
Review Comment:
The variable 'eventTerm' is only assigned values of primitive type and is
never 'null', but it is declared with the boxed type 'Long'.
```suggestion
long eventTerm = event.getTerm();
```
##########
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.ResponseBody;
+import okio.BufferedSource;
+import org.apache.seata.common.Constants;
+import org.apache.seata.common.exception.FrameworkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Seata HTTP/2 Watch implementation.
+ * Consumes server-pushed event stream via an iterator-style API.
+ * Each line contains a single event in "data: {json}" format, similar to
Kubernetes watch API.
+ *
+ * @param <T> event data type
+ */
+public class SeataHttpWatch<T>
+ implements Iterator<SeataHttpWatch.Response<T>>,
Iterable<SeataHttpWatch.Response<T>>, AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SeataHttpWatch.class);
+
+ private final ResponseBody responseBody;
+ private final BufferedSource source;
+ private final Call call;
+ private final ObjectMapper objectMapper;
+ private final Class<T> eventType;
+
+ /**
+ * Response wrapper containing event data.
+ * Since event format is simplified (no type field), all successful events
are treated the same.
+ * Client can determine the nature of the event by comparing metadata.term.
+ */
+ public static class Response<T> {
+ /**
+ * Response type enum
+ */
+ public enum Type {
+ /**
+ * Normal event with data (connection established or cluster
changed)
+ */
+ UPDATE,
+ /**
+ * Error event (parse error or connection error)
+ */
+ ERROR
+ }
+
+ public final Type type;
+
+ public final T object;
+
+ public Response(Type type, T object) {
+ this.type = type;
+ this.object = object;
+ }
+ }
+
+ /**
+ * Create a Watch instance from an OkHttp call
+ *
+ * @param call the prepared HTTP call
+ * @param eventType the class type for deserializing event data
+ * @param <T> the event data type
+ * @return a Watch instance
+ * @throws IOException if the request fails
+ */
+ public static <T> SeataHttpWatch<T> createWatch(Call call, Class<T>
eventType) throws IOException {
+
+ okhttp3.Response response = call.execute();
+
+ if (!response.isSuccessful()) {
+ String respBody = null;
+ try (ResponseBody body = response.body()) {
+ if (body != null) {
+ respBody = body.string();
+ }
+ } catch (IOException e) {
+ throw new FrameworkException(e, "Watch request failed: " +
response.message());
+ }
+ throw new FrameworkException(
+ String.format("Watch request failed with code %d: %s",
response.code(), respBody));
+ }
+
+ // Verify Content-Type is event stream
+ String contentType = response.header("Content-Type");
+ if (contentType == null || !contentType.contains("text/event-stream"))
{
+ LOGGER.warn("Expected Content-Type: text/event-stream, got: {}",
contentType);
+ }
+
+ return new SeataHttpWatch<>(response.body(), call, eventType);
+ }
+
+ /**
+ * Create a Watch instance with a prepared request
+ *
+ * @param client the OkHttpClient instance
+ * @param request the HTTP request
+ * @param eventType the class type for deserializing event data
+ * @param <T> the event data type
+ * @return a Watch instance
+ * @throws IOException if the request fails
+ */
+ public static <T> SeataHttpWatch<T> createWatch(OkHttpClient client,
Request request, Class<T> eventType)
+ throws IOException {
+
+ Call call = client.newCall(request);
+ return createWatch(call, eventType);
+ }
+
+ private SeataHttpWatch(ResponseBody responseBody, Call call, Class<T>
eventType) {
+ this.responseBody = responseBody;
+ this.source = responseBody.source();
+ this.call = call;
+ this.objectMapper = new ObjectMapper();
+ this.eventType = eventType;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ // Check if source is exhausted (stream closed)
+ return !source.exhausted();
+ } catch (IOException e) {
+ LOGGER.error("Error checking if stream has more data", e);
+ return false;
+ }
+ }
+
+ @Override
+ public Response<T> next() {
+ try {
+ /*
+ Read a single line and parse it as an event.
+ Format: "{prefix}{json}\n" where prefix is defined in
Constants.WATCH_EVENT_PREFIX.
+ Each line is a complete event, event type is included in the JSON
data.
+ */
+ String line = source.readUtf8Line();
+ if (line == null) {
+ throw new RuntimeException("Stream closed unexpectedly");
+ }
+
+ if (!line.startsWith(Constants.WATCH_EVENT_PREFIX)) {
+ throw new RuntimeException("Invalid event format: expected
prefix '" + Constants.WATCH_EVENT_PREFIX
+ + "', got: " + (line.length() > 20 ? line.substring(0,
20) + "..." : line));
+ }
+
+ String jsonData =
line.substring(Constants.WATCH_EVENT_PREFIX.length());
+ return parseEvent(jsonData);
+
+ } catch (IOException e) {
+ throw new RuntimeException("IO Exception during next()", e);
+ }
+ }
+
+ /**
+ * Parse event JSON into Response object.
+ * Simplified format: only contains group, timestamp, and metadata fields.
+ *
+ * @param json the JSON string to parse
+ * @return the parsed Response object
+ * @throws IOException if parsing fails
+ */
+ private Response<T> parseEvent(String json) throws IOException {
+ try {
+ T eventData = objectMapper.readValue(json, eventType);
+ return new Response<>(Response.Type.UPDATE, eventData);
+
+ } catch (Exception e) {
+ LOGGER.error("Failed to parse event JSON: {}", json, e);
+ // Return error response
+ return new Response<>(Response.Type.ERROR, null);
+ }
Review Comment:
The `parseEvent` method catches a broad `Exception` on line 191, which
includes both checked and unchecked exceptions. This could potentially catch
and suppress serious errors like `OutOfMemoryError` or `StackOverflowError`.
Consider catching only `JsonProcessingException` or `IOException` to handle
parsing failures specifically, while allowing critical JVM errors to propagate.
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -89,36 +141,104 @@ public void init() {
@Async
public void onChangeEvent(ClusterChangeEvent event) {
if (event.getTerm() > 0) {
- GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
- // Notifications are made of changes in cluster information
- Optional.ofNullable(WATCHERS.remove(event.getGroup()))
- .ifPresent(watchers ->
watchers.parallelStream().forEach(this::notifyWatcher));
+ String group = event.getGroup();
+ Long eventTerm = event.getTerm();
+
+ Long currentTerm = GROUP_UPDATE_TERM.get(group);
+ if (currentTerm != null && eventTerm <= currentTerm) {
+ logger.info(
+ "Discarding outdated event with term {} for group {},
current term is {}",
+ eventTerm,
+ group,
+ currentTerm);
+ return;
+ }
+
+ GROUP_UPDATE_TERM.put(group, eventTerm);
+
+ // Handle HTTP/1.1 watchers: remove and notify (one-time request)
+ Optional.ofNullable(HTTP1_WATCHERS.remove(group))
+ .ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
+ notifyWatcher(watcher, eventTerm);
+ // HTTP/1.1 watcher is done after notification
+ watcher.setDone(true);
+ }));
+
+ // Handle HTTP/2 watchers: notify without removing (long-lived
connection)
+ Queue<Watcher<HttpContext>> http2Watchers =
HTTP2_WATCHERS.get(group);
+ if (http2Watchers != null && !http2Watchers.isEmpty()) {
+ List<Watcher<HttpContext>> watchersToNotify = new
ArrayList<>(http2Watchers);
+ watchersToNotify.forEach(watcher -> {
+ if
(watcher.getAsyncContext().getContext().channel().isActive() &&
!watcher.isDone()) {
+ if (eventTerm > watcher.getTerm()) {
+ notifyWatcher(watcher, eventTerm);
+ } else {
+ logger.info(
+ "Skipping notification for watcher with
term {} >= event term {} for group {}",
+ watcher.getTerm(),
+ eventTerm,
+ group);
+ }
+ } else {
+ // Remove inactive watcher
+ http2Watchers.remove(watcher);
+ HTTP2_HEADERS_SENT.remove(watcher);
+ }
+ });
Review Comment:
The iteration through `watchersToNotify` checks if watchers are active and
not done, but on line 184, it attempts to remove from the original
`http2Watchers` queue. Since `watchersToNotify` is a snapshot copy created on
line 170, the `watcher` reference in the forEach on line 171 might not
successfully remove the correct element from `http2Watchers` in all cases,
especially if the queue has been modified concurrently. Consider tracking
watchers to remove and performing a batch removal after the iteration, or using
an iterator on the original queue with proper synchronization.
##########
common/src/main/java/org/apache/seata/common/util/SeataHttpWatch.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.common.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.Call;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.ResponseBody;
+import okio.BufferedSource;
+import org.apache.seata.common.Constants;
+import org.apache.seata.common.exception.FrameworkException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Seata HTTP/2 Watch implementation.
+ * Consumes server-pushed event stream via an iterator-style API.
+ * Each line contains a single event in "data: {json}" format, similar to
Kubernetes watch API.
+ *
+ * @param <T> event data type
+ */
+public class SeataHttpWatch<T>
+ implements Iterator<SeataHttpWatch.Response<T>>,
Iterable<SeataHttpWatch.Response<T>>, AutoCloseable {
Review Comment:
This Iterable is its own Iterator, but does not guard against multiple
iterations.
```suggestion
implements Iterator<SeataHttpWatch.Response<T>>, AutoCloseable {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]