YvCeung commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2659727977
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -90,35 +103,68 @@ public void init() {
public void onChangeEvent(ClusterChangeEvent event) {
if (event.getTerm() > 0) {
GROUP_UPDATE_TERM.put(event.getGroup(), event.getTerm());
- // Notifications are made of changes in cluster information
+ // Notify all watchers of cluster information changes
Optional.ofNullable(WATCHERS.remove(event.getGroup()))
.ifPresent(watchers ->
watchers.parallelStream().forEach(this::notifyWatcher));
}
}
private void notifyWatcher(Watcher<HttpContext> watcher) {
- watcher.setDone(true);
- sendWatcherResponse(watcher, HttpResponseStatus.OK);
- }
+ HttpContext context = watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof HttpContext && context.isHttp2();
+
+ if (!isHttp2) {
+ watcher.setDone(true);
+ }
+
+ boolean isFirstResponse = !HTTP2_HEADERS_SENT.getOrDefault(watcher,
false);
+ sendWatcherResponse(watcher, HttpResponseStatus.OK, false,
isFirstResponse);
+ if (isFirstResponse && isHttp2) {
+ HTTP2_HEADERS_SENT.put(watcher, true);
+ }
- private void sendWatcherResponse(Watcher<HttpContext> watcher,
HttpResponseStatus nettyStatus) {
+ // Update watcher's term to the latest term to prevent infinite loop
+ // This ensures that when registryWatcher is called, it won't trigger
notifyWatcher again
String group = watcher.getGroup();
+ Long latestTerm = GROUP_UPDATE_TERM.get(group);
+ if (latestTerm != null && latestTerm > watcher.getTerm()) {
+ watcher.setTerm(latestTerm);
+ }
+
+ // For HTTP/2, re-register the watcher to continue listening for
future updates
+ if (isHttp2 && !watcher.isDone()) {
+ registryWatcher(watcher);
+ }
+ }
+ /**
+ * Send watcher response to the client.
+ *
+ * @param watcher the watcher instance
+ * @param nettyStatus the HTTP status code
+ * @param closeStream whether to close the HTTP/2 stream (endStream=true)
+ * @param sendHeaders whether to send HTTP/2 headers frame (only needed
for first response)
+ */
+ private void sendWatcherResponse(
+ Watcher<HttpContext> watcher, HttpResponseStatus nettyStatus,
boolean closeStream, boolean sendHeaders) {
+
HttpContext context = watcher.getAsyncContext();
if (!(context instanceof HttpContext)) {
logger.warn(
"Unsupported context type for watcher on group {}: {}",
- group,
+ watcher.getGroup(),
context != null ? context.getClass().getName() : "null");
return;
}
Review Comment:
I think type checking here is necessary, so the historical logic status quo
has been retained, but the format of the printed logs has been modified
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]