Copilot commented on code in PR #7783:
URL: https://github.com/apache/incubator-seata/pull/7783#discussion_r2645049500
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1479,6 +1492,96 @@ public void testMergedSendRunnableWithEmptyBasket()
throws Exception {
}
}
+ @Test
+ public void testGetXidFromGlobalRollbackRequest() {
+ GlobalRollbackRequest request = new GlobalRollbackRequest();
+ request.setXid("rollback-xid-67890");
+ String xid = client.getXid(request);
+ assertEquals("rollback-xid-67890", xid);
+ }
+
+ @Test
+ public void testGetXidFromBranchReportRequest() {
+ BranchReportRequest request = new BranchReportRequest();
+ request.setXid("report-xid-54321");
+ String xid = client.getXid(request);
+ assertEquals("report-xid-54321", xid);
+ }
+
+ @Test
+ public void testReconnectTimerException() {
+ TestNettyRemotingClient client = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ // mock clientChannelManager.reconnect exception
+ NettyClientChannelManager mockManager =
mock(NettyClientChannelManager.class);
+ doThrow(new RuntimeException("forced
error")).when(mockManager).reconnect(ArgumentMatchers.anyString());
+ Field managerField =
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ managerField.setAccessible(true);
+ managerField.set(client, mockManager);
+
+ client.init();
+ Thread.sleep(100);
Review Comment:
The test sleeps for 100ms which may not be sufficient for the reconnect
timer to execute, especially on slower CI systems. This could lead to flaky
tests. Consider either increasing the sleep duration or using a more reliable
synchronization mechanism to verify the timer behavior.
```suggestion
TimeUnit.SECONDS.sleep(1);
```
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager
getClientChannelManager() {
return mockChannelManager != null ? mockChannelManager :
super.getClientChannelManager();
}
}
+
+ @Test
+ public void testConstructorCoreInitialization() {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap bootstrap = (NettyClientBootstrap)
clientBootstrapField.get(testClient);
+ assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+ Field clientChannelManagerField =
+
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ clientChannelManagerField.setAccessible(true);
+ NettyClientChannelManager channelManager =
+ (NettyClientChannelManager)
clientChannelManagerField.get(testClient);
+ assertNotNull(channelManager, "fail to init
NettyClientChannelManager");
+
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(testClient);
+ assertNotNull(reconnectTask, "fail to init reconnectTask");
+ } catch (Exception e) {
+ fail("test failed:" + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInitReconnectTimerStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+ NettyClientBootstrap mockBootstrap =
Mockito.mock(NettyClientBootstrap.class);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+ timerStartedField.setAccessible(true);
+ AtomicBoolean timerStarted = (AtomicBoolean)
timerStartedField.get(testClient);
+ assertFalse(timerStarted.get(), "reconnect is not started");
+
+ testClient.init();
+
+ assertTrue(timerStarted.get(), "reconnect starts");
+
+ Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+ Field timerExecutorField =
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+ timerExecutorField.setAccessible(true);
+ ScheduledExecutorService timerExecutor = (ScheduledExecutorService)
timerExecutorField.get(testClient);
+ assertNotNull(timerExecutor, "thread can not be null");
+ }
+
+ @Test
+ public void testInitMergeSendExecutorService() throws Exception {
+ TestNettyRemotingClientWithBatch batchClient =
+ new TestNettyRemotingClientWithBatch(clientConfig,
messageExecutor);
+ Field mergeSendExecutorField =
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+ mergeSendExecutorField.setAccessible(true);
+ assertNull(mergeSendExecutorField.get(batchClient), "thread should be
null");
+
+ batchClient.init();
+ ExecutorService mergeSendExecutor = (ExecutorService)
mergeSendExecutorField.get(batchClient);
+ assertNotNull(mergeSendExecutor, "mergeSendExecutor should be
initialized");
+ assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type
of thread should be ThreadPoolExecutor");
+
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+ assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should
be MAX_MERGE_SEND_THREAD");
+ assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool
should be MAX_MERGE_SEND_THREAD");
+ }
+
+ @Test
+ public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+ class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+ public TestClientWithEmptyServiceGroup(NettyClientConfig config,
ThreadPoolExecutor executor) {
+ super(config, executor);
+ }
+
+ @Override
+ protected String getTransactionServiceGroup() {
+ return "";
+ }
+ }
+
+ TestClientWithEmptyServiceGroup testClient = new
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+ assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+ }
+
+ @Test
+ public void testReconnectTaskThrowException() throws Exception {
+ TestNettyRemotingClientWithReconnectException exceptionClient =
+ new
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(exceptionClient);
+
+ assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
Review Comment:
The testClient is created but never destroyed. Even though the reconnect
task is run directly without calling init(), the client still has resources
(like the reconnectLock and transactionRole) that should be cleaned up. Add
proper cleanup to avoid potential resource leaks.
```suggestion
final TestClientWithEmptyServiceGroup testClient =
new TestClientWithEmptyServiceGroup(clientConfig,
messageExecutor);
Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable)
reconnectTaskField.get(testClient);
try {
assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
} finally {
testClient.destroy();
}
}
@Test
public void testReconnectTaskThrowException() throws Exception {
final TestNettyRemotingClientWithReconnectException exceptionClient =
new
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable)
reconnectTaskField.get(exceptionClient);
try {
assertDoesNotThrow(reconnectTask::run, "reconnectTask
exception");
} finally {
exceptionClient.destroy();
}
```
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager
getClientChannelManager() {
return mockChannelManager != null ? mockChannelManager :
super.getClientChannelManager();
}
}
+
+ @Test
+ public void testConstructorCoreInitialization() {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap bootstrap = (NettyClientBootstrap)
clientBootstrapField.get(testClient);
+ assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+ Field clientChannelManagerField =
+
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ clientChannelManagerField.setAccessible(true);
+ NettyClientChannelManager channelManager =
+ (NettyClientChannelManager)
clientChannelManagerField.get(testClient);
+ assertNotNull(channelManager, "fail to init
NettyClientChannelManager");
+
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(testClient);
+ assertNotNull(reconnectTask, "fail to init reconnectTask");
+ } catch (Exception e) {
+ fail("test failed:" + e.getMessage());
Review Comment:
The assertion message contains mixed English and Chinese characters ("test
failed:"). For consistency with the rest of the codebase which uses English,
the colon should be the ASCII colon character instead of the full-width Chinese
colon.
```suggestion
fail("test failed: " + e.getMessage());
```
##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -155,6 +161,19 @@ public AbstractNettyRemotingClient(
clientBootstrap.setChannelHandlers(new ClientHandler(), new
ChannelEventHandler(this));
clientChannelManager = new NettyClientChannelManager(
new NettyPoolableFactory(this, clientBootstrap),
getPoolKeyFunction(), nettyClientConfig);
+ this.reconnectTask = () -> {
+ if (!timerStarted.get()) {
+ return;
+ }
+ try {
+ String serviceGroup = getTransactionServiceGroup();
+ if (StringUtils.isNotBlank(serviceGroup)) {
+ clientChannelManager.reconnect(serviceGroup);
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Reconnect task failed for role: {}",
transactionRole.name(), t);
+ }
+ };
Review Comment:
The reconnectTask checks timerStarted flag at the beginning and returns
early if false. However, there's a potential race condition: if destroy() is
called and sets timerStarted to false while reconnectTask is already past the
check (line 165) but before executing reconnect (line 171), the reconnect could
still execute after destroy. Consider checking the timerStarted flag again
after acquiring any necessary resources or before the actual reconnect call.
##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -271,11 +290,19 @@ public void destroyChannel(String serverAddress, Channel
channel) {
@Override
public void destroy() {
- clientBootstrap.shutdown();
- if (mergeSendExecutorService != null) {
- mergeSendExecutorService.shutdown();
+ reconnectLock.lock();
+ try {
+ if (timerStarted.compareAndSet(true, false)) {
+ LOGGER.info("Reconnect timer stopped (role: {})",
transactionRole.name());
+ }
+ clientBootstrap.shutdown();
+ if (mergeSendExecutorService != null) {
+ mergeSendExecutorService.shutdown();
+ }
+ super.destroy();
Review Comment:
After setting timerStarted to false, there's no synchronization to ensure
that any in-flight reconnectTask executions have completed before calling
super.destroy() which shuts down the timerExecutor. This could cause the
scheduled reconnectTask to throw RejectedExecutionException or attempt to use
resources that are being shut down. Consider using shutdownNow() on the
scheduled future or adding a small delay to allow in-flight tasks to complete.
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager
getClientChannelManager() {
return mockChannelManager != null ? mockChannelManager :
super.getClientChannelManager();
}
}
+
+ @Test
+ public void testConstructorCoreInitialization() {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap bootstrap = (NettyClientBootstrap)
clientBootstrapField.get(testClient);
+ assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+ Field clientChannelManagerField =
+
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ clientChannelManagerField.setAccessible(true);
+ NettyClientChannelManager channelManager =
+ (NettyClientChannelManager)
clientChannelManagerField.get(testClient);
+ assertNotNull(channelManager, "fail to init
NettyClientChannelManager");
+
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(testClient);
+ assertNotNull(reconnectTask, "fail to init reconnectTask");
+ } catch (Exception e) {
+ fail("test failed:" + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInitReconnectTimerStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+ NettyClientBootstrap mockBootstrap =
Mockito.mock(NettyClientBootstrap.class);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+ timerStartedField.setAccessible(true);
+ AtomicBoolean timerStarted = (AtomicBoolean)
timerStartedField.get(testClient);
+ assertFalse(timerStarted.get(), "reconnect is not started");
+
+ testClient.init();
+
+ assertTrue(timerStarted.get(), "reconnect starts");
+
+ Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+ Field timerExecutorField =
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+ timerExecutorField.setAccessible(true);
+ ScheduledExecutorService timerExecutor = (ScheduledExecutorService)
timerExecutorField.get(testClient);
+ assertNotNull(timerExecutor, "thread can not be null");
+ }
+
+ @Test
+ public void testInitMergeSendExecutorService() throws Exception {
+ TestNettyRemotingClientWithBatch batchClient =
+ new TestNettyRemotingClientWithBatch(clientConfig,
messageExecutor);
+ Field mergeSendExecutorField =
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+ mergeSendExecutorField.setAccessible(true);
+ assertNull(mergeSendExecutorField.get(batchClient), "thread should be
null");
+
+ batchClient.init();
+ ExecutorService mergeSendExecutor = (ExecutorService)
mergeSendExecutorField.get(batchClient);
+ assertNotNull(mergeSendExecutor, "mergeSendExecutor should be
initialized");
+ assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type
of thread should be ThreadPoolExecutor");
+
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+ assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should
be MAX_MERGE_SEND_THREAD");
+ assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool
should be MAX_MERGE_SEND_THREAD");
+ }
+
+ @Test
+ public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+ class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+ public TestClientWithEmptyServiceGroup(NettyClientConfig config,
ThreadPoolExecutor executor) {
+ super(config, executor);
+ }
+
+ @Override
+ protected String getTransactionServiceGroup() {
+ return "";
+ }
+ }
+
+ TestClientWithEmptyServiceGroup testClient = new
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+ assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+ }
+
+ @Test
+ public void testReconnectTaskThrowException() throws Exception {
+ TestNettyRemotingClientWithReconnectException exceptionClient =
+ new
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(exceptionClient);
+
+ assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
+ }
+
+ @Test
+ public void testInitClientBootstrapStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ testClient.init();
+ verify(mockBootstrap, times(1)).start();
+ }
+
+ static class TestReconnectTaskClient extends TestNettyRemotingClient {
+ private final String transactionServiceGroup;
+
+ public TestReconnectTaskClient(NettyClientConfig config,
ThreadPoolExecutor executor, String serviceGroup) {
+ super(config, executor);
+ this.transactionServiceGroup = serviceGroup;
+ }
+
+ @Override
+ protected String getTransactionServiceGroup() {
+ return this.transactionServiceGroup;
+ }
+ }
+
+ @Test
+ public void testReconnectTask() throws Exception {
+ String testServiceGroup = "test-group";
+ NettyClientConfig clientConfig = new NettyClientConfig();
+ ThreadPoolExecutor messageExecutor =
+ new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new
LinkedBlockingQueue<>());
+ // Mock clientChannelManager
+ NettyClientChannelManager mockChannelManager =
mock(NettyClientChannelManager.class);
+ Field transactionRoleField =
AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
+ transactionRoleField.setAccessible(true);
+
+ TestReconnectTaskClient client1 = new
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+ Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+ timerStartedField.setAccessible(true);
+ AtomicBoolean timerStarted1 = (AtomicBoolean)
timerStartedField.get(client1);
+ timerStarted1.set(false);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1);
+
+ reconnectTask1.run();
+ verify(mockChannelManager, never()).reconnect(anyString());
+
+ TestReconnectTaskClient client2 = new
TestReconnectTaskClient(clientConfig, messageExecutor, "");
+ AtomicBoolean timerStarted2 = (AtomicBoolean)
timerStartedField.get(client2);
+ timerStarted2.set(true);
+ Field channelManagerField =
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ channelManagerField.setAccessible(true);
+ channelManagerField.set(client2, mockChannelManager);
+
+ // verify reconnect
+ Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2);
+ reconnectTask2.run();
+ verify(mockChannelManager, never()).reconnect(anyString());
+
+ TestReconnectTaskClient client3 = new
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+ AtomicBoolean timerStarted3 = (AtomicBoolean)
timerStartedField.get(client3);
+ timerStarted3.set(true);
+ channelManagerField.set(client3, mockChannelManager);
+
+ Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3);
+ reconnectTask3.run();
+ verify(mockChannelManager, times(1)).reconnect(testServiceGroup);
+
+ TestReconnectTaskClient client4 = new
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+ AtomicBoolean timerStarted4 = (AtomicBoolean)
timerStartedField.get(client4);
+ timerStarted4.set(true);
+ RuntimeException testEx = new RuntimeException("test-reconnect-error");
+ doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
+ channelManagerField.set(client4, mockChannelManager);
+
+ Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4);
+ assertDoesNotThrow(reconnectTask4::run);
Review Comment:
The test creates multiple client instances (client1, client2, client3,
client4) but doesn't properly clean them up. Each client has scheduled timers
and potentially other resources that should be destroyed. Add proper cleanup in
a finally block or use @AfterEach to ensure all test clients are destroyed.
```suggestion
TestReconnectTaskClient client1 = null;
TestReconnectTaskClient client2 = null;
TestReconnectTaskClient client3 = null;
TestReconnectTaskClient client4 = null;
try {
// Mock clientChannelManager
NettyClientChannelManager mockChannelManager =
mock(NettyClientChannelManager.class);
Field transactionRoleField =
AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
transactionRoleField.setAccessible(true);
client1 = new TestReconnectTaskClient(clientConfig,
messageExecutor, testServiceGroup);
Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
timerStartedField.setAccessible(true);
AtomicBoolean timerStarted1 = (AtomicBoolean)
timerStartedField.get(client1);
timerStarted1.set(false);
Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask1 = (Runnable)
reconnectTaskField.get(client1);
reconnectTask1.run();
verify(mockChannelManager, never()).reconnect(anyString());
client2 = new TestReconnectTaskClient(clientConfig,
messageExecutor, "");
AtomicBoolean timerStarted2 = (AtomicBoolean)
timerStartedField.get(client2);
timerStarted2.set(true);
Field channelManagerField =
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
channelManagerField.setAccessible(true);
channelManagerField.set(client2, mockChannelManager);
// verify reconnect
Runnable reconnectTask2 = (Runnable)
reconnectTaskField.get(client2);
reconnectTask2.run();
verify(mockChannelManager, never()).reconnect(anyString());
client3 = new TestReconnectTaskClient(clientConfig,
messageExecutor, testServiceGroup);
AtomicBoolean timerStarted3 = (AtomicBoolean)
timerStartedField.get(client3);
timerStarted3.set(true);
channelManagerField.set(client3, mockChannelManager);
Runnable reconnectTask3 = (Runnable)
reconnectTaskField.get(client3);
reconnectTask3.run();
verify(mockChannelManager, times(1)).reconnect(testServiceGroup);
client4 = new TestReconnectTaskClient(clientConfig,
messageExecutor, testServiceGroup);
AtomicBoolean timerStarted4 = (AtomicBoolean)
timerStartedField.get(client4);
timerStarted4.set(true);
RuntimeException testEx = new
RuntimeException("test-reconnect-error");
doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
channelManagerField.set(client4, mockChannelManager);
Runnable reconnectTask4 = (Runnable)
reconnectTaskField.get(client4);
assertDoesNotThrow(reconnectTask4::run);
} finally {
if (client1 != null) {
client1.destroy();
}
if (client2 != null) {
client2.destroy();
}
if (client3 != null) {
client3.destroy();
}
if (client4 != null) {
client4.destroy();
}
messageExecutor.shutdownNow();
}
```
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1479,6 +1492,96 @@ public void testMergedSendRunnableWithEmptyBasket()
throws Exception {
}
}
+ @Test
+ public void testGetXidFromGlobalRollbackRequest() {
+ GlobalRollbackRequest request = new GlobalRollbackRequest();
+ request.setXid("rollback-xid-67890");
+ String xid = client.getXid(request);
+ assertEquals("rollback-xid-67890", xid);
+ }
+
+ @Test
+ public void testGetXidFromBranchReportRequest() {
+ BranchReportRequest request = new BranchReportRequest();
+ request.setXid("report-xid-54321");
+ String xid = client.getXid(request);
+ assertEquals("report-xid-54321", xid);
+ }
+
+ @Test
+ public void testReconnectTimerException() {
+ TestNettyRemotingClient client = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ // mock clientChannelManager.reconnect exception
+ NettyClientChannelManager mockManager =
mock(NettyClientChannelManager.class);
+ doThrow(new RuntimeException("forced
error")).when(mockManager).reconnect(ArgumentMatchers.anyString());
+ Field managerField =
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ managerField.setAccessible(true);
+ managerField.set(client, mockManager);
+
+ client.init();
+ Thread.sleep(100);
+ } catch (Exception e) {
+ fail("Reconnect timer exception test failed: " + e.getMessage());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetXidWithoutXidField() {
+ class NoXidMessage extends AbstractMessage {
+ private String name;
+
+ public NoXidMessage(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public short getTypeCode() {
+ return 0;
+ }
+ }
+
+ NoXidMessage msg = new NoXidMessage("test-no-xid");
+ String xid1 = client.getXid(msg);
+ String xid2 = client.getXid(msg);
+
+ assertNotNull(xid1);
+ assertNotNull(xid2);
+ Assertions.assertNotEquals(xid1, xid2);
+ }
+
+ @Test
+ public void testInitAndDestroyMultipleTimes() {
+ TestNettyRemotingClient multiClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+ try {
+ // initiate first
+ multiClient.init();
+
+ // verify status
+ assertNotNull(multiClient);
Review Comment:
The assertion 'assertNotNull(multiClient)' on line 1564 is not meaningful
since multiClient was just created on line 1557 and would never be null at this
point. Consider replacing this with a more meaningful assertion that verifies
the client is properly initialized, such as checking the timerStarted flag or
verifying that the timer has been scheduled.
```suggestion
// verify status: timer should be started after init
try {
Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
timerStartedField.setAccessible(true);
AtomicBoolean timerStarted = (AtomicBoolean)
timerStartedField.get(multiClient);
assertNotNull(timerStarted);
assertTrue(timerStarted.get());
} catch (NoSuchFieldException | IllegalAccessException e) {
Assertions.fail("Failed to access timerStarted field on
AbstractNettyRemotingClient", e);
}
```
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager
getClientChannelManager() {
return mockChannelManager != null ? mockChannelManager :
super.getClientChannelManager();
}
}
+
+ @Test
+ public void testConstructorCoreInitialization() {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap bootstrap = (NettyClientBootstrap)
clientBootstrapField.get(testClient);
+ assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+ Field clientChannelManagerField =
+
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ clientChannelManagerField.setAccessible(true);
+ NettyClientChannelManager channelManager =
+ (NettyClientChannelManager)
clientChannelManagerField.get(testClient);
+ assertNotNull(channelManager, "fail to init
NettyClientChannelManager");
+
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(testClient);
+ assertNotNull(reconnectTask, "fail to init reconnectTask");
+ } catch (Exception e) {
+ fail("test failed:" + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInitReconnectTimerStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+ NettyClientBootstrap mockBootstrap =
Mockito.mock(NettyClientBootstrap.class);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+ timerStartedField.setAccessible(true);
+ AtomicBoolean timerStarted = (AtomicBoolean)
timerStartedField.get(testClient);
+ assertFalse(timerStarted.get(), "reconnect is not started");
+
+ testClient.init();
+
+ assertTrue(timerStarted.get(), "reconnect starts");
+
+ Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+ Field timerExecutorField =
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+ timerExecutorField.setAccessible(true);
+ ScheduledExecutorService timerExecutor = (ScheduledExecutorService)
timerExecutorField.get(testClient);
+ assertNotNull(timerExecutor, "thread can not be null");
+ }
+
+ @Test
+ public void testInitMergeSendExecutorService() throws Exception {
+ TestNettyRemotingClientWithBatch batchClient =
+ new TestNettyRemotingClientWithBatch(clientConfig,
messageExecutor);
+ Field mergeSendExecutorField =
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+ mergeSendExecutorField.setAccessible(true);
+ assertNull(mergeSendExecutorField.get(batchClient), "thread should be
null");
+
+ batchClient.init();
+ ExecutorService mergeSendExecutor = (ExecutorService)
mergeSendExecutorField.get(batchClient);
+ assertNotNull(mergeSendExecutor, "mergeSendExecutor should be
initialized");
+ assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type
of thread should be ThreadPoolExecutor");
+
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+ assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should
be MAX_MERGE_SEND_THREAD");
+ assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool
should be MAX_MERGE_SEND_THREAD");
+ }
+
+ @Test
+ public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+ class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+ public TestClientWithEmptyServiceGroup(NettyClientConfig config,
ThreadPoolExecutor executor) {
+ super(config, executor);
+ }
+
+ @Override
+ protected String getTransactionServiceGroup() {
+ return "";
+ }
+ }
+
+ TestClientWithEmptyServiceGroup testClient = new
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+ assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+ }
+
+ @Test
+ public void testReconnectTaskThrowException() throws Exception {
+ TestNettyRemotingClientWithReconnectException exceptionClient =
+ new
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(exceptionClient);
+
+ assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
+ }
+
+ @Test
+ public void testInitClientBootstrapStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ testClient.init();
+ verify(mockBootstrap, times(1)).start();
Review Comment:
The testClient is initialized but not destroyed in a finally block. If an
exception occurs during field access or assertions, the client's resources
(timers, executors) won't be cleaned up properly. Add a finally block to ensure
cleanup.
```suggestion
try {
Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
clientBootstrapField.setAccessible(true);
NettyClientBootstrap mockBootstrap =
mock(NettyClientBootstrap.class);
clientBootstrapField.set(testClient, mockBootstrap);
testClient.init();
verify(mockBootstrap, times(1)).start();
} finally {
testClient.destroy();
}
```
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager
getClientChannelManager() {
return mockChannelManager != null ? mockChannelManager :
super.getClientChannelManager();
}
}
+
+ @Test
+ public void testConstructorCoreInitialization() {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap bootstrap = (NettyClientBootstrap)
clientBootstrapField.get(testClient);
+ assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+ Field clientChannelManagerField =
+
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ clientChannelManagerField.setAccessible(true);
+ NettyClientChannelManager channelManager =
+ (NettyClientChannelManager)
clientChannelManagerField.get(testClient);
+ assertNotNull(channelManager, "fail to init
NettyClientChannelManager");
+
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(testClient);
+ assertNotNull(reconnectTask, "fail to init reconnectTask");
+ } catch (Exception e) {
+ fail("test failed:" + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInitReconnectTimerStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+ NettyClientBootstrap mockBootstrap =
Mockito.mock(NettyClientBootstrap.class);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+ timerStartedField.setAccessible(true);
+ AtomicBoolean timerStarted = (AtomicBoolean)
timerStartedField.get(testClient);
+ assertFalse(timerStarted.get(), "reconnect is not started");
+
+ testClient.init();
+
+ assertTrue(timerStarted.get(), "reconnect starts");
+
+ Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+ Field timerExecutorField =
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+ timerExecutorField.setAccessible(true);
+ ScheduledExecutorService timerExecutor = (ScheduledExecutorService)
timerExecutorField.get(testClient);
+ assertNotNull(timerExecutor, "thread can not be null");
+ }
+
+ @Test
+ public void testInitMergeSendExecutorService() throws Exception {
+ TestNettyRemotingClientWithBatch batchClient =
+ new TestNettyRemotingClientWithBatch(clientConfig,
messageExecutor);
+ Field mergeSendExecutorField =
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+ mergeSendExecutorField.setAccessible(true);
+ assertNull(mergeSendExecutorField.get(batchClient), "thread should be
null");
+
+ batchClient.init();
+ ExecutorService mergeSendExecutor = (ExecutorService)
mergeSendExecutorField.get(batchClient);
+ assertNotNull(mergeSendExecutor, "mergeSendExecutor should be
initialized");
+ assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type
of thread should be ThreadPoolExecutor");
+
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+ assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should
be MAX_MERGE_SEND_THREAD");
+ assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool
should be MAX_MERGE_SEND_THREAD");
Review Comment:
The batchClient is initialized but never explicitly destroyed in a finally
block, which could lead to resource leaks if the test fails. Consider adding a
finally block to ensure proper cleanup of the client and its resources (timer
executor, merge send executor, etc.).
```suggestion
try {
batchClient.init();
ExecutorService mergeSendExecutor = (ExecutorService)
mergeSendExecutorField.get(batchClient);
assertNotNull(mergeSendExecutor, "mergeSendExecutor should be
initialized");
assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the
type of thread should be ThreadPoolExecutor");
ThreadPoolExecutor threadPool = (ThreadPoolExecutor)
mergeSendExecutor;
assertEquals(1, threadPool.getCorePoolSize(), "core threadPool
should be MAX_MERGE_SEND_THREAD");
assertEquals(1, threadPool.getMaximumPoolSize(), "the max
threadPool should be MAX_MERGE_SEND_THREAD");
} finally {
batchClient.destroy();
}
```
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager
getClientChannelManager() {
return mockChannelManager != null ? mockChannelManager :
super.getClientChannelManager();
}
}
+
+ @Test
+ public void testConstructorCoreInitialization() {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap bootstrap = (NettyClientBootstrap)
clientBootstrapField.get(testClient);
+ assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+ Field clientChannelManagerField =
+
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ clientChannelManagerField.setAccessible(true);
+ NettyClientChannelManager channelManager =
+ (NettyClientChannelManager)
clientChannelManagerField.get(testClient);
+ assertNotNull(channelManager, "fail to init
NettyClientChannelManager");
+
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(testClient);
+ assertNotNull(reconnectTask, "fail to init reconnectTask");
+ } catch (Exception e) {
+ fail("test failed:" + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInitReconnectTimerStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+ NettyClientBootstrap mockBootstrap =
Mockito.mock(NettyClientBootstrap.class);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+ timerStartedField.setAccessible(true);
+ AtomicBoolean timerStarted = (AtomicBoolean)
timerStartedField.get(testClient);
+ assertFalse(timerStarted.get(), "reconnect is not started");
+
+ testClient.init();
+
+ assertTrue(timerStarted.get(), "reconnect starts");
+
+ Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+ Field timerExecutorField =
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+ timerExecutorField.setAccessible(true);
+ ScheduledExecutorService timerExecutor = (ScheduledExecutorService)
timerExecutorField.get(testClient);
+ assertNotNull(timerExecutor, "thread can not be null");
Review Comment:
Similar to the previous test, testClient is initialized but never destroyed,
potentially leaking resources. Additionally, the test doesn't clean up the mock
bootstrap that was injected. Consider adding proper cleanup in a finally block.
```suggestion
} finally {
if (testClient != null) {
testClient.destroy();
}
}
}
@Test
public void testInitReconnectTimerStart() throws Exception {
TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
NettyClientBootstrap originalBootstrap = null;
try {
NettyClientBootstrap mockBootstrap =
Mockito.mock(NettyClientBootstrap.class);
Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
clientBootstrapField.setAccessible(true);
originalBootstrap = (NettyClientBootstrap)
clientBootstrapField.get(testClient);
clientBootstrapField.set(testClient, mockBootstrap);
Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
timerStartedField.setAccessible(true);
AtomicBoolean timerStarted = (AtomicBoolean)
timerStartedField.get(testClient);
assertFalse(timerStarted.get(), "reconnect is not started");
testClient.init();
assertTrue(timerStarted.get(), "reconnect starts");
Mockito.verify(mockBootstrap, Mockito.times(1)).start();
Field timerExecutorField =
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
timerExecutorField.setAccessible(true);
ScheduledExecutorService timerExecutor =
(ScheduledExecutorService) timerExecutorField.get(testClient);
assertNotNull(timerExecutor, "thread can not be null");
} finally {
try {
Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
clientBootstrapField.setAccessible(true);
clientBootstrapField.set(testClient, originalBootstrap);
} catch (Exception ignore) {
// ignore cleanup exceptions in test
}
if (testClient != null) {
testClient.destroy();
}
}
```
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager
getClientChannelManager() {
return mockChannelManager != null ? mockChannelManager :
super.getClientChannelManager();
}
}
+
+ @Test
+ public void testConstructorCoreInitialization() {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap bootstrap = (NettyClientBootstrap)
clientBootstrapField.get(testClient);
+ assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+ Field clientChannelManagerField =
+
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ clientChannelManagerField.setAccessible(true);
+ NettyClientChannelManager channelManager =
+ (NettyClientChannelManager)
clientChannelManagerField.get(testClient);
+ assertNotNull(channelManager, "fail to init
NettyClientChannelManager");
+
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(testClient);
+ assertNotNull(reconnectTask, "fail to init reconnectTask");
+ } catch (Exception e) {
+ fail("test failed:" + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInitReconnectTimerStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+ NettyClientBootstrap mockBootstrap =
Mockito.mock(NettyClientBootstrap.class);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+ timerStartedField.setAccessible(true);
+ AtomicBoolean timerStarted = (AtomicBoolean)
timerStartedField.get(testClient);
+ assertFalse(timerStarted.get(), "reconnect is not started");
+
+ testClient.init();
+
+ assertTrue(timerStarted.get(), "reconnect starts");
+
+ Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+ Field timerExecutorField =
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+ timerExecutorField.setAccessible(true);
+ ScheduledExecutorService timerExecutor = (ScheduledExecutorService)
timerExecutorField.get(testClient);
+ assertNotNull(timerExecutor, "thread can not be null");
+ }
+
+ @Test
+ public void testInitMergeSendExecutorService() throws Exception {
+ TestNettyRemotingClientWithBatch batchClient =
+ new TestNettyRemotingClientWithBatch(clientConfig,
messageExecutor);
+ Field mergeSendExecutorField =
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+ mergeSendExecutorField.setAccessible(true);
+ assertNull(mergeSendExecutorField.get(batchClient), "thread should be
null");
+
+ batchClient.init();
+ ExecutorService mergeSendExecutor = (ExecutorService)
mergeSendExecutorField.get(batchClient);
+ assertNotNull(mergeSendExecutor, "mergeSendExecutor should be
initialized");
+ assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type
of thread should be ThreadPoolExecutor");
+
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+ assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should
be MAX_MERGE_SEND_THREAD");
+ assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool
should be MAX_MERGE_SEND_THREAD");
+ }
+
+ @Test
+ public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+ class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+ public TestClientWithEmptyServiceGroup(NettyClientConfig config,
ThreadPoolExecutor executor) {
+ super(config, executor);
+ }
+
+ @Override
+ protected String getTransactionServiceGroup() {
+ return "";
+ }
+ }
+
+ TestClientWithEmptyServiceGroup testClient = new
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+ assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+ }
+
+ @Test
+ public void testReconnectTaskThrowException() throws Exception {
+ TestNettyRemotingClientWithReconnectException exceptionClient =
+ new
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(exceptionClient);
+
+ assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
Review Comment:
The exceptionClient is created but never destroyed. Add proper cleanup in a
finally block to ensure resources are released even if the test fails.
```suggestion
TestNettyRemotingClientWithReconnectException exceptionClient = null;
try {
exceptionClient = new
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
reconnectTaskField.setAccessible(true);
Runnable reconnectTask = (Runnable)
reconnectTaskField.get(exceptionClient);
assertDoesNotThrow(reconnectTask::run, "reconnectTask
exception");
} finally {
if (exceptionClient != null) {
exceptionClient.destroy();
}
}
```
##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -120,29 +125,30 @@ public abstract class AbstractNettyRemotingClient extends
AbstractNettyRemoting
@Override
public void init() {
- timerExecutor.scheduleAtFixedRate(
- () -> {
- try {
-
clientChannelManager.reconnect(getTransactionServiceGroup());
- } catch (Exception ex) {
- LOGGER.warn("reconnect server failed. {}",
ex.getMessage());
- }
- },
- SCHEDULE_DELAY_MILLS,
- SCHEDULE_INTERVAL_MILLS,
- TimeUnit.MILLISECONDS);
- if (this.isEnableClientBatchSendRequest()) {
- mergeSendExecutorService = new ThreadPoolExecutor(
- MAX_MERGE_SEND_THREAD,
- MAX_MERGE_SEND_THREAD,
- KEEP_ALIVE_TIME,
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(),
- new NamedThreadFactory(getThreadPrefix(),
MAX_MERGE_SEND_THREAD));
- mergeSendExecutorService.submit(new MergedSendRunnable());
+ reconnectLock.lock();
+ try {
+ if (timerStarted.compareAndSet(false, true)) {
+ timerExecutor.scheduleAtFixedRate(
+ reconnectTask, SCHEDULE_DELAY_MILLS,
SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
+ LOGGER.info("Reconnect timer started (role: {})",
transactionRole.name());
+ }
+
+ if (this.isEnableClientBatchSendRequest()) {
+ mergeSendExecutorService = new ThreadPoolExecutor(
+ MAX_MERGE_SEND_THREAD,
+ MAX_MERGE_SEND_THREAD,
+ KEEP_ALIVE_TIME,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory(getThreadPrefix(),
MAX_MERGE_SEND_THREAD));
+ mergeSendExecutorService.submit(new MergedSendRunnable());
+ }
Review Comment:
The mergeSendExecutorService can be initialized multiple times if init() is
called multiple times. While the timerStarted flag prevents duplicate timer
scheduling, there's no similar protection for mergeSendExecutorService. This
could lead to thread pool leaks if init() is called multiple times without
calling destroy() in between. Consider adding a check to prevent
re-initialization of the executor service, similar to the timer protection.
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1479,6 +1492,96 @@ public void testMergedSendRunnableWithEmptyBasket()
throws Exception {
}
}
+ @Test
+ public void testGetXidFromGlobalRollbackRequest() {
+ GlobalRollbackRequest request = new GlobalRollbackRequest();
+ request.setXid("rollback-xid-67890");
+ String xid = client.getXid(request);
+ assertEquals("rollback-xid-67890", xid);
+ }
+
+ @Test
+ public void testGetXidFromBranchReportRequest() {
+ BranchReportRequest request = new BranchReportRequest();
+ request.setXid("report-xid-54321");
+ String xid = client.getXid(request);
+ assertEquals("report-xid-54321", xid);
+ }
+
+ @Test
+ public void testReconnectTimerException() {
+ TestNettyRemotingClient client = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ // mock clientChannelManager.reconnect exception
+ NettyClientChannelManager mockManager =
mock(NettyClientChannelManager.class);
+ doThrow(new RuntimeException("forced
error")).when(mockManager).reconnect(ArgumentMatchers.anyString());
+ Field managerField =
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ managerField.setAccessible(true);
+ managerField.set(client, mockManager);
+
+ client.init();
+ Thread.sleep(100);
+ } catch (Exception e) {
+ fail("Reconnect timer exception test failed: " + e.getMessage());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetXidWithoutXidField() {
+ class NoXidMessage extends AbstractMessage {
+ private String name;
+
+ public NoXidMessage(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public short getTypeCode() {
+ return 0;
+ }
+ }
+
+ NoXidMessage msg = new NoXidMessage("test-no-xid");
+ String xid1 = client.getXid(msg);
+ String xid2 = client.getXid(msg);
+
+ assertNotNull(xid1);
+ assertNotNull(xid2);
+ Assertions.assertNotEquals(xid1, xid2);
+ }
+
+ @Test
+ public void testInitAndDestroyMultipleTimes() {
+ TestNettyRemotingClient multiClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+ try {
+ // initiate first
+ multiClient.init();
+
+ // verify status
+ assertNotNull(multiClient);
+
+ multiClient.destroy();
+ multiClient.destroy();
+
+ } finally {
+ // clean
+ try {
+ multiClient.destroy();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
Review Comment:
The test verifies that calling destroy() multiple times doesn't cause
issues, but it doesn't verify that subsequent calls to init() after destroy()
work correctly. This is important since the timerStarted flag might prevent
re-initialization. Consider adding a test that calls init(), destroy(), init()
sequence to ensure the client can be reinitialized properly.
##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager
getClientChannelManager() {
return mockChannelManager != null ? mockChannelManager :
super.getClientChannelManager();
}
}
+
+ @Test
+ public void testConstructorCoreInitialization() {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ try {
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap bootstrap = (NettyClientBootstrap)
clientBootstrapField.get(testClient);
+ assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+ Field clientChannelManagerField =
+
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ clientChannelManagerField.setAccessible(true);
+ NettyClientChannelManager channelManager =
+ (NettyClientChannelManager)
clientChannelManagerField.get(testClient);
+ assertNotNull(channelManager, "fail to init
NettyClientChannelManager");
+
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(testClient);
+ assertNotNull(reconnectTask, "fail to init reconnectTask");
+ } catch (Exception e) {
+ fail("test failed:" + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInitReconnectTimerStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+ NettyClientBootstrap mockBootstrap =
Mockito.mock(NettyClientBootstrap.class);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+ timerStartedField.setAccessible(true);
+ AtomicBoolean timerStarted = (AtomicBoolean)
timerStartedField.get(testClient);
+ assertFalse(timerStarted.get(), "reconnect is not started");
+
+ testClient.init();
+
+ assertTrue(timerStarted.get(), "reconnect starts");
+
+ Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+ Field timerExecutorField =
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+ timerExecutorField.setAccessible(true);
+ ScheduledExecutorService timerExecutor = (ScheduledExecutorService)
timerExecutorField.get(testClient);
+ assertNotNull(timerExecutor, "thread can not be null");
+ }
+
+ @Test
+ public void testInitMergeSendExecutorService() throws Exception {
+ TestNettyRemotingClientWithBatch batchClient =
+ new TestNettyRemotingClientWithBatch(clientConfig,
messageExecutor);
+ Field mergeSendExecutorField =
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+ mergeSendExecutorField.setAccessible(true);
+ assertNull(mergeSendExecutorField.get(batchClient), "thread should be
null");
+
+ batchClient.init();
+ ExecutorService mergeSendExecutor = (ExecutorService)
mergeSendExecutorField.get(batchClient);
+ assertNotNull(mergeSendExecutor, "mergeSendExecutor should be
initialized");
+ assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type
of thread should be ThreadPoolExecutor");
+
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+ assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should
be MAX_MERGE_SEND_THREAD");
+ assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool
should be MAX_MERGE_SEND_THREAD");
+ }
+
+ @Test
+ public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+ class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+ public TestClientWithEmptyServiceGroup(NettyClientConfig config,
ThreadPoolExecutor executor) {
+ super(config, executor);
+ }
+
+ @Override
+ protected String getTransactionServiceGroup() {
+ return "";
+ }
+ }
+
+ TestClientWithEmptyServiceGroup testClient = new
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+ assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+ }
+
+ @Test
+ public void testReconnectTaskThrowException() throws Exception {
+ TestNettyRemotingClientWithReconnectException exceptionClient =
+ new
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask = (Runnable)
reconnectTaskField.get(exceptionClient);
+
+ assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
+ }
+
+ @Test
+ public void testInitClientBootstrapStart() throws Exception {
+ TestNettyRemotingClient testClient = new
TestNettyRemotingClient(clientConfig, messageExecutor);
+ Field clientBootstrapField =
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+ clientBootstrapField.setAccessible(true);
+ NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class);
+ clientBootstrapField.set(testClient, mockBootstrap);
+
+ testClient.init();
+ verify(mockBootstrap, times(1)).start();
+ }
+
+ static class TestReconnectTaskClient extends TestNettyRemotingClient {
+ private final String transactionServiceGroup;
+
+ public TestReconnectTaskClient(NettyClientConfig config,
ThreadPoolExecutor executor, String serviceGroup) {
+ super(config, executor);
+ this.transactionServiceGroup = serviceGroup;
+ }
+
+ @Override
+ protected String getTransactionServiceGroup() {
+ return this.transactionServiceGroup;
+ }
+ }
+
+ @Test
+ public void testReconnectTask() throws Exception {
+ String testServiceGroup = "test-group";
+ NettyClientConfig clientConfig = new NettyClientConfig();
+ ThreadPoolExecutor messageExecutor =
+ new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new
LinkedBlockingQueue<>());
+ // Mock clientChannelManager
+ NettyClientChannelManager mockChannelManager =
mock(NettyClientChannelManager.class);
+ Field transactionRoleField =
AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
+ transactionRoleField.setAccessible(true);
+
+ TestReconnectTaskClient client1 = new
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+ Field timerStartedField =
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+ timerStartedField.setAccessible(true);
+ AtomicBoolean timerStarted1 = (AtomicBoolean)
timerStartedField.get(client1);
+ timerStarted1.set(false);
+ Field reconnectTaskField =
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+ reconnectTaskField.setAccessible(true);
+ Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1);
+
+ reconnectTask1.run();
+ verify(mockChannelManager, never()).reconnect(anyString());
+
+ TestReconnectTaskClient client2 = new
TestReconnectTaskClient(clientConfig, messageExecutor, "");
+ AtomicBoolean timerStarted2 = (AtomicBoolean)
timerStartedField.get(client2);
+ timerStarted2.set(true);
+ Field channelManagerField =
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+ channelManagerField.setAccessible(true);
+ channelManagerField.set(client2, mockChannelManager);
+
+ // verify reconnect
+ Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2);
+ reconnectTask2.run();
+ verify(mockChannelManager, never()).reconnect(anyString());
+
+ TestReconnectTaskClient client3 = new
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+ AtomicBoolean timerStarted3 = (AtomicBoolean)
timerStartedField.get(client3);
+ timerStarted3.set(true);
+ channelManagerField.set(client3, mockChannelManager);
+
+ Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3);
+ reconnectTask3.run();
+ verify(mockChannelManager, times(1)).reconnect(testServiceGroup);
+
+ TestReconnectTaskClient client4 = new
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+ AtomicBoolean timerStarted4 = (AtomicBoolean)
timerStartedField.get(client4);
+ timerStarted4.set(true);
+ RuntimeException testEx = new RuntimeException("test-reconnect-error");
+ doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
+ channelManagerField.set(client4, mockChannelManager);
+
+ Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4);
+ assertDoesNotThrow(reconnectTask4::run);
+ }
Review Comment:
This test creates four separate client instances but uses the same
mockChannelManager across all of them. This means the verify() calls are
checking the cumulative behavior across all clients, not the individual client
behavior. Each client should have its own mock instance to properly isolate the
test cases and verify the expected behavior for each scenario independently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]