This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new adfbef513c feature: Support HTTP/2 response handling for the Watch API
in Server Raft mode (#7826)
adfbef513c is described below
commit adfbef513caf246f0f300a50a771adafea503af1
Author: xiaoyu <[email protected]>
AuthorDate: Tue Dec 16 10:33:55 2025 +0800
feature: Support HTTP/2 response handling for the Watch API in Server Raft
mode (#7826)
---
changes/en-us/2.x.md | 1 +
changes/zh-cn/2.x.md | 1 +
server/pom.xml | 1 -
.../cluster/manager/ClusterWatcherManager.java | 45 +++-
.../cluster/manager/ClusterWatcherManagerTest.java | 263 +++++++++++++++++++++
.../server/controller/ClusterControllerTest.java | 69 +++++-
6 files changed, 354 insertions(+), 26 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index d9b0327996..4f5f6d89cd 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -29,6 +29,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7664](https://github.com/apache/incubator-seata/pull/7664)] support
shentongdatabase XA mode
- [[#7675](https://github.com/apache/incubator-seata/pull/7675)] support
Oracle Batch Insert
- [[#7663](https://github.com/apache/incubator-seata/pull/7663)] add Java 25
support in CI configuration files
+- [[#7826](https://github.com/apache/incubator-seata/pull/7826)] Support
HTTP/2 response handling for the Watch API in Server Raft mode
### bugfix:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 4773516307..9b53ad0b04 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -29,6 +29,7 @@
- [[#7664](https://github.com/apache/incubator-seata/pull/7565)] 支持神通数据库的XA模式
- [[#7675](https://github.com/apache/incubator-seata/pull/7675)] 支持Oracle批量插入
- [[#7663](https://github.com/apache/incubator-seata/pull/7663)]
支持java25版本的CI流水綫
+- [[#7826](https://github.com/apache/incubator-seata/pull/7826)] 在 Server Raft
模式下,为 Watch API 提供对 HTTP/2 响应处理的支持
### bugfix:
diff --git a/server/pom.xml b/server/pom.xml
index d9efcc971b..4f1a67d2ae 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -301,7 +301,6 @@
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
- <scope>test</scope>
</dependency>
</dependencies>
diff --git
a/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
b/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
index 632da039c1..600c3a9126 100644
---
a/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
+++
b/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
@@ -24,6 +24,10 @@ import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.Http2Headers;
import org.apache.seata.common.rpc.http.HttpContext;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.server.cluster.listener.ClusterChangeEvent;
@@ -98,31 +102,46 @@ public class ClusterWatcherManager implements
ClusterChangeListener {
}
private void sendWatcherResponse(Watcher<HttpContext> watcher,
HttpResponseStatus nettyStatus) {
+ String group = watcher.getGroup();
HttpContext context = watcher.getAsyncContext();
if (!(context instanceof HttpContext)) {
logger.warn(
"Unsupported context type for watcher on group {}: {}",
- watcher.getGroup(),
+ group,
context != null ? context.getClass().getName() : "null");
return;
}
ChannelHandlerContext ctx = context.getContext();
+ if (!ctx.channel().isActive()) {
+ logger.warn("Netty channel is not active for watcher on group {},
cannot send response.", group);
+ return;
+ }
+
if (!context.isHttp2()) {
- if (ctx.channel().isActive()) {
- HttpResponse response =
- new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
nettyStatus, Unpooled.EMPTY_BUFFER);
- response.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0);
+ // HTTP/1.1 response
+ HttpResponse response =
+ new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
nettyStatus, Unpooled.EMPTY_BUFFER);
+ response.headers().set(HttpHeaderNames.CONTENT_LENGTH, 0);
- if (!context.isKeepAlive()) {
-
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
- } else {
- ctx.writeAndFlush(response);
- }
+ if (!context.isKeepAlive()) {
+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
- logger.warn(
- "Netty channel is not active for watcher on group {},
cannot send response.",
- watcher.getGroup());
+ ctx.writeAndFlush(response);
}
+ } else {
+ // HTTP/2 response (h2c support)
+ // Send headers frame
+ Http2Headers headers = new
DefaultHttp2Headers().status(nettyStatus.codeAsText());
+ headers.set(HttpHeaderNames.CONTENT_LENGTH, "0");
+ ctx.write(new DefaultHttp2HeadersFrame(headers));
+
+ // Send empty data frame with endStream=true to close the stream
+ ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER,
true))
+ .addListener(f -> {
+ if (!f.isSuccess()) {
+ logger.warn("HTTP2 response send failed,
group={}", group, f.cause());
+ }
+ });
}
}
diff --git
a/server/src/test/java/org/apache/seata/server/cluster/manager/ClusterWatcherManagerTest.java
b/server/src/test/java/org/apache/seata/server/cluster/manager/ClusterWatcherManagerTest.java
new file mode 100644
index 0000000000..21bb16f7ba
--- /dev/null
+++
b/server/src/test/java/org/apache/seata/server/cluster/manager/ClusterWatcherManagerTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.cluster.manager;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.seata.common.rpc.http.HttpContext;
+import org.apache.seata.server.BaseSpringBootTest;
+import org.apache.seata.server.cluster.listener.ClusterChangeEvent;
+import org.apache.seata.server.cluster.watch.Watcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class ClusterWatcherManagerTest extends BaseSpringBootTest {
+
+ private ClusterWatcherManager clusterWatcherManager;
+ private ChannelHandlerContext mockChannelHandlerContext;
+ private Channel mockChannel;
+ private HttpContext<Object> httpContext;
+
+ private static final String TEST_GROUP = "test-group";
+ private static final int TEST_TIMEOUT = 5000;
+ private static final long TEST_TERM = 1000L;
+
+ @BeforeEach
+ void setUp() {
+ clusterWatcherManager = new ClusterWatcherManager();
+
+ mockChannel = mock(Channel.class);
+ mockChannelHandlerContext = mock(ChannelHandlerContext.class);
+ when(mockChannelHandlerContext.channel()).thenReturn(mockChannel);
+
+ Object mockRequest = new Object();
+ httpContext = new HttpContext<>(mockRequest,
mockChannelHandlerContext, true, HttpContext.HTTP_1_1);
+
+ Map<String, Queue<Watcher<HttpContext>>> watchers = (Map<String,
Queue<Watcher<HttpContext>>>)
+ ReflectionTestUtils.getField(clusterWatcherManager,
"WATCHERS");
+ Map<String, Long> groupUpdateTerm =
+ (Map<String, Long>)
ReflectionTestUtils.getField(clusterWatcherManager, "GROUP_UPDATE_TERM");
+ if (watchers != null) {
+ watchers.clear();
+ }
+ if (groupUpdateTerm != null) {
+ groupUpdateTerm.clear();
+ }
+ }
+
+ @AfterEach
+ void tearDown() {
+ ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor)
+ ReflectionTestUtils.getField(clusterWatcherManager,
"scheduledThreadPoolExecutor");
+ if (executor != null && !executor.isShutdown()) {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Test
+ void testSendWatcherResponseWithInactiveChannel() {
+ when(mockChannel.isActive()).thenReturn(false);
+
+ Watcher<HttpContext> watcher = new Watcher<>(TEST_GROUP, httpContext,
TEST_TIMEOUT, TEST_TERM);
+
+ assertDoesNotThrow(() -> {
+ ReflectionTestUtils.invokeMethod(clusterWatcherManager,
"notifyWatcher", watcher);
+ });
+
+ verify(mockChannel, atLeastOnce()).isActive();
+
+ verify(mockChannelHandlerContext, never()).write(any());
+ verify(mockChannelHandlerContext, never()).writeAndFlush(any());
+ verify(mockChannelHandlerContext, never()).flush();
+
+ assertTrue(watcher.isDone());
+ }
+
+ @Test
+ void testSendWatcherResponseWithActiveChannel_Http1() {
+ when(mockChannel.isActive()).thenReturn(true);
+ Watcher<HttpContext> watcher = new Watcher<>(TEST_GROUP, httpContext,
TEST_TIMEOUT, TEST_TERM);
+
+ assertDoesNotThrow(() -> {
+ ReflectionTestUtils.invokeMethod(clusterWatcherManager,
"notifyWatcher", watcher);
+ });
+
+ verify(mockChannel, atLeastOnce()).isActive();
+
+ verify(mockChannelHandlerContext, atLeastOnce()).writeAndFlush(any());
+
+ assertTrue(watcher.isDone());
+ }
+
+ @Test
+ void testSendWatcherResponseWithActiveChannel_Http2() {
+ // Test normal flow with active channel (HTTP/2)
+ when(mockChannel.isActive()).thenReturn(true);
+
+ // Mock writeAndFlush to return a non-null ChannelFuture
+ ChannelFuture mockChannelFuture = mock(ChannelFuture.class);
+
when(mockChannelHandlerContext.writeAndFlush(any())).thenReturn(mockChannelFuture);
+
when(mockChannelFuture.addListener(any())).thenReturn(mockChannelFuture);
+
+ // Create HTTP/2 context
+ HttpContext<Object> http2Context =
+ new HttpContext<>(new Object(), mockChannelHandlerContext,
true, HttpContext.HTTP_2_0);
+
+ Watcher<HttpContext> watcher = new Watcher<>(TEST_GROUP, http2Context,
TEST_TIMEOUT, TEST_TERM);
+
+ assertDoesNotThrow(() -> {
+ ReflectionTestUtils.invokeMethod(clusterWatcherManager,
"notifyWatcher", watcher);
+ });
+
+ verify(mockChannel, atLeastOnce()).isActive();
+
+ verify(mockChannelHandlerContext, atLeastOnce()).write(any());
+ verify(mockChannelHandlerContext, atLeastOnce()).writeAndFlush(any());
+
+ assertTrue(watcher.isDone());
+ }
+
+ @Test
+ void testOnChangeEventWithInactiveChannel() {
+ when(mockChannel.isActive()).thenReturn(false);
+
+ Watcher<HttpContext> watcher = new Watcher<>(TEST_GROUP, httpContext,
TEST_TIMEOUT, TEST_TERM);
+
+ clusterWatcherManager.registryWatcher(watcher);
+
+ Map<String, Queue<Watcher<HttpContext>>> watchers = (Map<String,
Queue<Watcher<HttpContext>>>)
+ ReflectionTestUtils.getField(clusterWatcherManager,
"WATCHERS");
+ assertTrue(watchers.containsKey(TEST_GROUP));
+ assertEquals(1, watchers.get(TEST_GROUP).size());
+
+ ClusterChangeEvent event = new ClusterChangeEvent(this, TEST_GROUP,
TEST_TERM + 1, true);
+
+ assertDoesNotThrow(() -> {
+ clusterWatcherManager.onChangeEvent(event);
+ });
+
+ verify(mockChannel, atLeastOnce()).isActive();
+
+ verify(mockChannelHandlerContext, never()).write(any());
+ verify(mockChannelHandlerContext, never()).writeAndFlush(any());
+
+ assertTrue(watcher.isDone());
+ }
+
+ @Test
+ void testTimeoutWithInactiveChannel() throws InterruptedException {
+ when(mockChannel.isActive()).thenReturn(false);
+
+ Watcher<HttpContext> watcher = new Watcher<>(TEST_GROUP, httpContext,
1000, TEST_TERM);
+
+ clusterWatcherManager.registryWatcher(watcher);
+
+ clusterWatcherManager.init();
+
+ Thread.sleep(2500);
+
+ verify(mockChannel, atLeastOnce()).isActive();
+
+ verify(mockChannelHandlerContext, never()).write(any());
+ verify(mockChannelHandlerContext, never()).writeAndFlush(any());
+
+ assertTrue(watcher.isDone());
+ }
+
+ @Test
+ void testRegistryWatcherWithInactiveChannel() {
+ when(mockChannel.isActive()).thenReturn(false);
+
+ Map<String, Long> groupUpdateTerm =
+ (Map<String, Long>)
ReflectionTestUtils.getField(clusterWatcherManager, "GROUP_UPDATE_TERM");
+ groupUpdateTerm.put(TEST_GROUP, TEST_TERM + 10);
+
+ Watcher<HttpContext> watcher = new Watcher<>(TEST_GROUP, httpContext,
TEST_TIMEOUT, TEST_TERM);
+
+ assertDoesNotThrow(() -> {
+ clusterWatcherManager.registryWatcher(watcher);
+ });
+
+ verify(mockChannel, atLeastOnce()).isActive();
+
+ verify(mockChannelHandlerContext, never()).write(any());
+ verify(mockChannelHandlerContext, never()).writeAndFlush(any());
+
+ assertTrue(watcher.isDone());
+ }
+
+ @Test
+ void testHttp2WriteAndFlushFailedShouldTriggerListener() throws Exception {
+ when(mockChannel.isActive()).thenReturn(true);
+
+ ChannelFuture mockFuture = mock(ChannelFuture.class);
+
when(mockChannelHandlerContext.writeAndFlush(any())).thenReturn(mockFuture);
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor<GenericFutureListener<? extends Future<? super Void>>>
listenerCaptor =
+ ArgumentCaptor.forClass(GenericFutureListener.class);
+
+
when(mockFuture.addListener(listenerCaptor.capture())).thenReturn(mockFuture);
+
+ RuntimeException cause = new RuntimeException("mock http2 write
failed");
+ when(mockFuture.isSuccess()).thenReturn(false);
+ when(mockFuture.cause()).thenReturn(cause);
+
+ HttpContext<Object> http2Context =
+ new HttpContext<>(new Object(), mockChannelHandlerContext,
true, HttpContext.HTTP_2_0);
+
+ Watcher<HttpContext> watcher = new Watcher<>(TEST_GROUP, http2Context,
TEST_TIMEOUT, TEST_TERM);
+
+ assertDoesNotThrow(() ->
ReflectionTestUtils.invokeMethod(clusterWatcherManager, "notifyWatcher",
watcher));
+
+ GenericFutureListener<? extends Future<? super Void>> listener =
listenerCaptor.getValue();
+ assertNotNull(listener);
+
+ verify(mockChannel, atLeastOnce()).isActive();
+ verify(mockChannelHandlerContext, atLeastOnce()).write(any());
+ verify(mockChannelHandlerContext, atLeastOnce()).writeAndFlush(any());
+
+ assertTrue(watcher.isDone());
+ }
+}
diff --git
a/server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java
b/server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java
index 710b61e56b..825e423f63 100644
---
a/server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java
+++
b/server/src/test/java/org/apache/seata/server/controller/ClusterControllerTest.java
@@ -115,9 +115,7 @@ class ClusterControllerTest extends BaseSpringBootTest {
HttpClientUtil.doPostWithHttp2(
"http://127.0.0.1:" + port +
"/metadata/v1/watch?timeout=3000", params, headers, callback);
- // Currently, the server side does not have the ability to send http2
responses,
- // so if no response is received here, it will definitely time out
- Assertions.assertFalse(latch.await(5, TimeUnit.SECONDS));
+ Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
@@ -152,7 +150,54 @@ class ClusterControllerTest extends BaseSpringBootTest {
}
@Test
- @Order(4)
+ @Order(5)
+ void watch_withHttp2() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Map<String, String> headers = new HashMap<>();
+ headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+
+ Map<String, String> params = new HashMap<>();
+ params.put("default-test", "1");
+
+ Thread thread = new Thread(() -> {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ ((ApplicationEventPublisher)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .publishEvent(new ClusterChangeEvent(this, "default-test",
2, true));
+ });
+ thread.start();
+
+ HttpCallback<Response> callback = new HttpCallback<Response>() {
+ @Override
+ public void onSuccess(Response response) {
+ Assertions.assertNotNull(response);
+ Assertions.assertEquals(Protocol.H2_PRIOR_KNOWLEDGE,
response.protocol());
+ Assertions.assertEquals(HttpStatus.SC_OK, response.code());
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ Assertions.fail("Should not fail: " + t.getMessage());
+ }
+
+ @Override
+ public void onCancelled() {
+ Assertions.fail("Should not be cancelled");
+ }
+ };
+
+ HttpClientUtil.doPostWithHttp2(
+ "http://127.0.0.1:" + port + "/metadata/v1/watch", params,
headers, callback, 30);
+ Assertions.assertTrue(latch.await(35, TimeUnit.SECONDS));
+ }
+
+ @Test
+ @Order(6)
void testXssFilterBlocked_queryParam() throws Exception {
String malicious = "<script>alert('xss')</script>";
Map<String, String> header = new HashMap<>();
@@ -169,7 +214,7 @@ class ClusterControllerTest extends BaseSpringBootTest {
}
@Test
- @Order(5)
+ @Order(7)
void testXssFilterBlocked_queryParam_withGetHttp2() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
@@ -208,7 +253,7 @@ class ClusterControllerTest extends BaseSpringBootTest {
}
@Test
- @Order(6)
+ @Order(8)
void testXssFilterBlocked_formParam_withPostHttp2() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
@@ -245,7 +290,7 @@ class ClusterControllerTest extends BaseSpringBootTest {
}
@Test
- @Order(7)
+ @Order(9)
void testXssFilterBlocked_bodyParam_withPostHttp2() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
@@ -280,7 +325,7 @@ class ClusterControllerTest extends BaseSpringBootTest {
}
@Test
- @Order(8)
+ @Order(10)
void testXssFilterBlocked_formParam() throws Exception {
Map<String, String> headers = new HashMap<>();
headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
@@ -296,7 +341,7 @@ class ClusterControllerTest extends BaseSpringBootTest {
}
@Test
- @Order(9)
+ @Order(11)
void testXssFilterBlocked_jsonBody() throws Exception {
Map<String, String> headers = new HashMap<>();
headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_JSON.getMimeType());
@@ -311,7 +356,7 @@ class ClusterControllerTest extends BaseSpringBootTest {
}
@Test
- @Order(10)
+ @Order(12)
void testXssFilterBlocked_headerParam() throws Exception {
Map<String, String> headers = new HashMap<>();
headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
@@ -328,7 +373,7 @@ class ClusterControllerTest extends BaseSpringBootTest {
}
@Test
- @Order(11)
+ @Order(13)
void testXssFilterBlocked_multiSource() throws Exception {
Map<String, String> headers = new HashMap<>();
headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_JSON.getMimeType());
@@ -348,7 +393,7 @@ class ClusterControllerTest extends BaseSpringBootTest {
}
@Test
- @Order(12)
+ @Order(14)
void testXssFilterBlocked_formParamWithUserCustomKeyWords() throws
Exception {
Map<String, String> headers = new HashMap<>();
headers.put(HTTP.CONTENT_TYPE,
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]