funky-eyes commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2666816518
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,24 +59,35 @@ public class ClusterWatcherManager implements
ClusterChangeListener {
private static final Map<String, Long> GROUP_UPDATE_TERM = new
ConcurrentHashMap<>();
+ private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT
= new ConcurrentHashMap<>();
+
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("long-polling", 1));
@PostConstruct
public void init() {
- // Responds to monitors that time out
+ // Periodically check and respond to watchers that have timed out
scheduledThreadPoolExecutor.scheduleAtFixedRate(
() -> {
for (String group : WATCHERS.keySet()) {
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
- if (System.currentTimeMillis() >=
watcher.getTimeout()) {
- watcher.setDone(true);
- sendWatcherResponse(watcher,
HttpResponseStatus.NOT_MODIFIED);
- }
- if (!watcher.isDone()) {
- // Re-register
- registryWatcher(watcher);
+ HttpContext context =
watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof
HttpContext && context.isHttp2();
+ if (isHttp2) {
+ if
(!context.getContext().channel().isActive()) {
+ watcher.setDone(true);
+ HTTP2_HEADERS_SENT.remove(watcher);
+ } else {
+ registryWatcher(watcher);
Review Comment:
> Currently, optimizations have been made for potential loss issues.
Different queues have been created for watching HTTP1 and HTTP2 respectively to
store data. HTTP2 does not remove and re-register regularly. The scheduled
tasks only check whether the channel is valid to prevent memory leaks.
>
> ```java
> // Separate watchers for HTTP/1.1 (one-time requests) and HTTP/2
(long-lived connections)
> private static final Map<String, Queue<Watcher<HttpContext>>>
HTTP1_WATCHERS = new ConcurrentHashMap<>();
> private static final Map<String, Queue<Watcher<HttpContext>>>
HTTP2_WATCHERS = new ConcurrentHashMap<>();
>
> // Check HTTP/2 watchers for connection validity (don't remove, just check)
> for (Map.Entry<String, Queue<Watcher<HttpContext>>>
entry : HTTP2_WATCHERS.entrySet()) {
> String group = entry.getKey();
> Queue<Watcher<HttpContext>> watchers =
entry.getValue();
> if (watchers == null || watchers.isEmpty()) {
> continue;
> }
>
> // Create snapshot to avoid concurrent modification
> List<Watcher<HttpContext>> watchersToCheck = new
ArrayList<>(watchers);
>
> watchersToCheck.forEach(watcher -> {
> HttpContext context =
watcher.getAsyncContext();
> if
(!context.getContext().channel().isActive()) {
> // Remove invalid watcher
> watchers.remove(watcher);
> watcher.setDone(true);
> HTTP2_HEADERS_SENT.remove(watcher);
> logger.debug("Removed inactive HTTP/2
watcher for group: {}", group);
> }
> });
> }
> },
> ```
We also need to consider another issue: since the response is written back
using HTTP/2 + SSE, is there any thread safety concern with concurrent writes
to the SSE response? Additionally, could there be an out-of-order delivery
problem—for example, if two events are generated, with event 1 arriving at the
client after event 2, potentially causing the newer event to be overwritten by
the older one?
To address this, should we ensure that events are delivered in order and
include term validation? Specifically, if the term of a previous event in the
response is already greater than that of a subsequent event, the latter should
be discarded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]