funky-eyes commented on code in PR #7903:
URL: https://github.com/apache/incubator-seata/pull/7903#discussion_r2659252630
##########
common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java:
##########
@@ -265,25 +215,101 @@ private static String buildUrlWithParams(String url,
Map<String, String> params)
return urlBuilder.toString();
}
- private static void executeAsync(OkHttpClient client, Request request,
final HttpCallback<Response> callback) {
- client.newCall(request).enqueue(new Callback() {
- @Override
- public void onResponse(Call call, Response response) {
- try {
- callback.onSuccess(response);
- } finally {
- response.close();
- }
- }
+ /**
+ * Create an HTTP/2 client for watch connections.
+ * This client is configured for long-lived connections to receive
Server-Sent Events (SSE).
+ * The client instances are cached and reused based on the connection
timeout to improve performance.
+ *
+ * @param connectTimeoutSeconds connection timeout in seconds (fast
failure if server is unreachable)
+ * @return configured OkHttpClient instance (cached and reused)
+ */
+ private static OkHttpClient createHttp2WatchClient(int
connectTimeoutSeconds) {
+ return HTTP2_CLIENT_MAP.computeIfAbsent(connectTimeoutSeconds, k ->
new OkHttpClient.Builder()
+
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
+ // Fast failure during connection phase
+ .connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
+ // Infinite read timeout to allow continuous listening for
server push
+ .readTimeout(0, TimeUnit.SECONDS)
Review Comment:
We should make the read timeout configurable instead of hardcoding it to 0.
When readTimeout is set to 0, the underlying socket timeout becomes infinite,
which can cause the request to block indefinitely. In cases where the server
shuts down abruptly (e.g., a crash without a graceful TCP handshake), this may
lead to permanent blocking.
The connectTimeout should be limited to around 10 seconds, while the
readTimeout should be adjustable based on the specific request type.
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -57,24 +59,35 @@ public class ClusterWatcherManager implements
ClusterChangeListener {
private static final Map<String, Long> GROUP_UPDATE_TERM = new
ConcurrentHashMap<>();
+ private static final Map<Watcher<HttpContext>, Boolean> HTTP2_HEADERS_SENT
= new ConcurrentHashMap<>();
+
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
new ScheduledThreadPoolExecutor(1, new
NamedThreadFactory("long-polling", 1));
@PostConstruct
public void init() {
- // Responds to monitors that time out
+ // Periodically check and respond to watchers that have timed out
scheduledThreadPoolExecutor.scheduleAtFixedRate(
() -> {
for (String group : WATCHERS.keySet()) {
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers ->
watchers.parallelStream().forEach(watcher -> {
- if (System.currentTimeMillis() >=
watcher.getTimeout()) {
- watcher.setDone(true);
- sendWatcherResponse(watcher,
HttpResponseStatus.NOT_MODIFIED);
- }
- if (!watcher.isDone()) {
- // Re-register
- registryWatcher(watcher);
+ HttpContext context =
watcher.getAsyncContext();
+ boolean isHttp2 = context instanceof
HttpContext && context.isHttp2();
+ if (isHttp2) {
+ if
(!context.getContext().channel().isActive()) {
+ watcher.setDone(true);
+ HTTP2_HEADERS_SENT.remove(watcher);
+ } else {
+ registryWatcher(watcher);
Review Comment:
If the cluster state changes again during the interval before re-registering
for the subscription queue, is there a risk—in extreme cases—that the
notification could be lost?
##########
server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java:
##########
@@ -128,27 +174,63 @@ private void sendWatcherResponse(Watcher<HttpContext>
watcher, HttpResponseStatu
} else {
ctx.writeAndFlush(response);
}
- } else {
- // HTTP/2 response (h2c support)
- // Send headers frame
+ return;
+ }
+
+ // For HTTP/2, headers must be sent first on the initial response
+ if (sendHeaders) {
Http2Headers headers = new
DefaultHttp2Headers().status(nettyStatus.codeAsText());
- headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+ headers.set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream;
charset=utf-8");
+ headers.set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
+
ctx.write(new DefaultHttp2HeadersFrame(headers));
+ }
+
+ String group = watcher.getGroup();
+ String sse = buildSSEFormat(nettyStatus, closeStream, sendHeaders,
group);
+
+ ByteBuf content = Unpooled.copiedBuffer(sse, StandardCharsets.UTF_8);
- // Send empty data frame with endStream=true to close the stream
- ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER,
true))
- .addListener(f -> {
- if (!f.isSuccess()) {
- logger.warn("HTTP2 response send failed,
group={}", group, f.cause());
- }
- });
+ // Send DATA frame (if closeStream is true, it will end the current
stream)
+ ctx.write(new DefaultHttp2DataFrame(content, closeStream));
+ ctx.flush();
+ }
+
+ private String buildSSEFormat(
+ HttpResponseStatus nettyStatus, boolean closeStream, boolean
sendHeaders, String group) {
+ // Determine event type (embedded in JSON, not in SSE event field)
+ String eventType;
+ if (sendHeaders) {
+ // Send keepalive event when stream is first established to
confirm connection
+ eventType = "keepalive";
+ } else if (closeStream && nettyStatus ==
HttpResponseStatus.NOT_MODIFIED) {
+ // Timeout event, stream needs to be closed
+ eventType = "timeout";
+ } else {
+ // Normal cluster update event
+ eventType = "cluster-update";
}
+
+ String json = String.format(
Review Comment:
I hope that when an HTTP/2 connection is established, the server directly
pushes the cluster change results to the client, rather than requiring the
client to initiate a separate request to fetch the cluster information.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]