YvCeung commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2665164922
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,24 +59,35 @@ public class ClusterWatcherManager implements
ClusterChangeListener {
private static final Map<String, Long> GROUP_UPDATE_TERM = new
ConcurrentHashMap<>();
+ private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT
= new ConcurrentHashMap<>();
+
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("long-polling", 1));
@PostConstruct
public void init() {
- // Responds to monitors that time out
+ // Periodically check and respond to watchers that have timed out
scheduledThreadPoolExecutor.scheduleAtFixedRate(
() -> {
for (String group : WATCHERS.keySet()) {
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
- if (System.currentTimeMillis() >=
watcher.getTimeout()) {
- watcher.setDone(true);
- sendWatcherResponse(watcher,
HttpResponseStatus.NOT_MODIFIED);
- }
- if (!watcher.isDone()) {
- // Re-register
- registryWatcher(watcher);
+ HttpContext context =
watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof
HttpContext && context.isHttp2();
+ if (isHttp2) {
+ if
(!context.getContext().channel().isActive()) {
+ watcher.setDone(true);
+ HTTP2_HEADERS_SENT.remove(watcher);
+ } else {
+ registryWatcher(watcher);
Review Comment:
Currently, optimizations have been made for potential loss issues. Different
queues have been created for watching HTTP1 and HTTP2 respectively to store
data. HTTP2 does not remove and re-register regularly. The scheduled tasks only
check whether the channel is valid to prevent memory leaks.
```java
// Separate watchers for HTTP/1.1 (one-time requests) and HTTP/2 (long-lived
connections)
private static final Map<String, Queue<Watcher<HttpContext>>>
HTTP1_WATCHERS = new ConcurrentHashMap<>();
private static final Map<String, Queue<Watcher<HttpContext>>>
HTTP2_WATCHERS = new ConcurrentHashMap<>();
// Check HTTP/2 watchers for connection validity (don't remove, just check)
for (Map.Entry<String, Queue<Watcher<HttpContext>>>
entry : HTTP2_WATCHERS.entrySet()) {
String group = entry.getKey();
Queue<Watcher<HttpContext>> watchers =
entry.getValue();
if (watchers == null || watchers.isEmpty()) {
continue;
}
// Create snapshot to avoid concurrent modification
List<Watcher<HttpContext>> watchersToCheck = new
ArrayList<>(watchers);
watchersToCheck.forEach(watcher -> {
HttpContext context = watcher.getAsyncContext();
if (!context.getContext().channel().isActive()) {
// Remove invalid watcher
watchers.remove(watcher);
watcher.setDone(true);
HTTP2_HEADERS_SENT.remove(watcher);
logger.debug("Removed inactive HTTP/2
watcher for group: {}", group);
}
});
}
},
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]