Copilot commented on code in PR #7783:
URL: https://github.com/apache/incubator-seata/pull/7783#discussion_r2645049500


##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1479,6 +1492,96 @@ public void testMergedSendRunnableWithEmptyBasket() 
throws Exception {
         }
     }
 
+    @Test
+    public void testGetXidFromGlobalRollbackRequest() {
+        GlobalRollbackRequest request = new GlobalRollbackRequest();
+        request.setXid("rollback-xid-67890");
+        String xid = client.getXid(request);
+        assertEquals("rollback-xid-67890", xid);
+    }
+
+    @Test
+    public void testGetXidFromBranchReportRequest() {
+        BranchReportRequest request = new BranchReportRequest();
+        request.setXid("report-xid-54321");
+        String xid = client.getXid(request);
+        assertEquals("report-xid-54321", xid);
+    }
+
+    @Test
+    public void testReconnectTimerException() {
+        TestNettyRemotingClient client = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            // mock clientChannelManager.reconnect exception
+            NettyClientChannelManager mockManager = 
mock(NettyClientChannelManager.class);
+            doThrow(new RuntimeException("forced 
error")).when(mockManager).reconnect(ArgumentMatchers.anyString());
+            Field managerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            managerField.setAccessible(true);
+            managerField.set(client, mockManager);
+
+            client.init();
+            Thread.sleep(100);

Review Comment:
   The test sleeps for 100ms which may not be sufficient for the reconnect 
timer to execute, especially on slower CI systems. This could lead to flaky 
tests. Consider either increasing the sleep duration or using a more reliable 
synchronization mechanism to verify the timer behavior.
   ```suggestion
               TimeUnit.SECONDS.sleep(1);
   ```



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager 
getClientChannelManager() {
             return mockChannelManager != null ? mockChannelManager : 
super.getClientChannelManager();
         }
     }
+
+    @Test
+    public void testConstructorCoreInitialization() {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+            clientBootstrapField.setAccessible(true);
+            NettyClientBootstrap bootstrap = (NettyClientBootstrap) 
clientBootstrapField.get(testClient);
+            assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+            Field clientChannelManagerField =
+                    
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            clientChannelManagerField.setAccessible(true);
+            NettyClientChannelManager channelManager =
+                    (NettyClientChannelManager) 
clientChannelManagerField.get(testClient);
+            assertNotNull(channelManager, "fail to init 
NettyClientChannelManager");
+
+            Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+            reconnectTaskField.setAccessible(true);
+            Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(testClient);
+            assertNotNull(reconnectTask, "fail to init reconnectTask");
+        } catch (Exception e) {
+            fail("test failed:" + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInitReconnectTimerStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+        NettyClientBootstrap mockBootstrap = 
Mockito.mock(NettyClientBootstrap.class);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+        timerStartedField.setAccessible(true);
+        AtomicBoolean timerStarted = (AtomicBoolean) 
timerStartedField.get(testClient);
+        assertFalse(timerStarted.get(), "reconnect is not started");
+
+        testClient.init();
+
+        assertTrue(timerStarted.get(), "reconnect starts");
+
+        Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+        Field timerExecutorField = 
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+        timerExecutorField.setAccessible(true);
+        ScheduledExecutorService timerExecutor = (ScheduledExecutorService) 
timerExecutorField.get(testClient);
+        assertNotNull(timerExecutor, "thread can not be null");
+    }
+
+    @Test
+    public void testInitMergeSendExecutorService() throws Exception {
+        TestNettyRemotingClientWithBatch batchClient =
+                new TestNettyRemotingClientWithBatch(clientConfig, 
messageExecutor);
+        Field mergeSendExecutorField = 
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+        mergeSendExecutorField.setAccessible(true);
+        assertNull(mergeSendExecutorField.get(batchClient), "thread should be 
null");
+
+        batchClient.init();
+        ExecutorService mergeSendExecutor = (ExecutorService) 
mergeSendExecutorField.get(batchClient);
+        assertNotNull(mergeSendExecutor, "mergeSendExecutor should be 
initialized");
+        assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type 
of thread should be ThreadPoolExecutor");
+
+        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+        assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should 
be MAX_MERGE_SEND_THREAD");
+        assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool 
should be MAX_MERGE_SEND_THREAD");
+    }
+
+    @Test
+    public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+        class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+            public TestClientWithEmptyServiceGroup(NettyClientConfig config, 
ThreadPoolExecutor executor) {
+                super(config, executor);
+            }
+
+            @Override
+            protected String getTransactionServiceGroup() {
+                return "";
+            }
+        }
+
+        TestClientWithEmptyServiceGroup testClient = new 
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+        assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+    }
+
+    @Test
+    public void testReconnectTaskThrowException() throws Exception {
+        TestNettyRemotingClientWithReconnectException exceptionClient =
+                new 
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(exceptionClient);
+
+        assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");

Review Comment:
   The testClient is created but never destroyed. Even though the reconnect 
task is run directly without calling init(), the client still has resources 
(like the reconnectLock and transactionRole) that should be cleaned up. Add 
proper cleanup to avoid potential resource leaks.
   ```suggestion
           final TestClientWithEmptyServiceGroup testClient =
                   new TestClientWithEmptyServiceGroup(clientConfig, 
messageExecutor);
           Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
           reconnectTaskField.setAccessible(true);
           Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(testClient);
   
           try {
               assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
           } finally {
               testClient.destroy();
           }
       }
   
       @Test
       public void testReconnectTaskThrowException() throws Exception {
           final TestNettyRemotingClientWithReconnectException exceptionClient =
                   new 
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
           Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
           reconnectTaskField.setAccessible(true);
           Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(exceptionClient);
   
           try {
               assertDoesNotThrow(reconnectTask::run, "reconnectTask 
exception");
           } finally {
               exceptionClient.destroy();
           }
   ```



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager 
getClientChannelManager() {
             return mockChannelManager != null ? mockChannelManager : 
super.getClientChannelManager();
         }
     }
+
+    @Test
+    public void testConstructorCoreInitialization() {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+            clientBootstrapField.setAccessible(true);
+            NettyClientBootstrap bootstrap = (NettyClientBootstrap) 
clientBootstrapField.get(testClient);
+            assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+            Field clientChannelManagerField =
+                    
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            clientChannelManagerField.setAccessible(true);
+            NettyClientChannelManager channelManager =
+                    (NettyClientChannelManager) 
clientChannelManagerField.get(testClient);
+            assertNotNull(channelManager, "fail to init 
NettyClientChannelManager");
+
+            Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+            reconnectTaskField.setAccessible(true);
+            Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(testClient);
+            assertNotNull(reconnectTask, "fail to init reconnectTask");
+        } catch (Exception e) {
+            fail("test failed:" + e.getMessage());

Review Comment:
   The assertion message contains mixed English and Chinese characters ("test 
failed:"). For consistency with the rest of the codebase which uses English, 
the colon should be the ASCII colon character instead of the full-width Chinese 
colon.
   ```suggestion
               fail("test failed: " + e.getMessage());
   ```



##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -155,6 +161,19 @@ public AbstractNettyRemotingClient(
         clientBootstrap.setChannelHandlers(new ClientHandler(), new 
ChannelEventHandler(this));
         clientChannelManager = new NettyClientChannelManager(
                 new NettyPoolableFactory(this, clientBootstrap), 
getPoolKeyFunction(), nettyClientConfig);
+        this.reconnectTask = () -> {
+            if (!timerStarted.get()) {
+                return;
+            }
+            try {
+                String serviceGroup = getTransactionServiceGroup();
+                if (StringUtils.isNotBlank(serviceGroup)) {
+                    clientChannelManager.reconnect(serviceGroup);
+                }
+            } catch (Throwable t) {
+                LOGGER.error("Reconnect task failed for role: {}", 
transactionRole.name(), t);
+            }
+        };

Review Comment:
   The reconnectTask checks timerStarted flag at the beginning and returns 
early if false. However, there's a potential race condition: if destroy() is 
called and sets timerStarted to false while reconnectTask is already past the 
check (line 165) but before executing reconnect (line 171), the reconnect could 
still execute after destroy. Consider checking the timerStarted flag again 
after acquiring any necessary resources or before the actual reconnect call.



##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -271,11 +290,19 @@ public void destroyChannel(String serverAddress, Channel 
channel) {
 
     @Override
     public void destroy() {
-        clientBootstrap.shutdown();
-        if (mergeSendExecutorService != null) {
-            mergeSendExecutorService.shutdown();
+        reconnectLock.lock();
+        try {
+            if (timerStarted.compareAndSet(true, false)) {
+                LOGGER.info("Reconnect timer stopped (role: {})", 
transactionRole.name());
+            }
+            clientBootstrap.shutdown();
+            if (mergeSendExecutorService != null) {
+                mergeSendExecutorService.shutdown();
+            }
+            super.destroy();

Review Comment:
   After setting timerStarted to false, there's no synchronization to ensure 
that any in-flight reconnectTask executions have completed before calling 
super.destroy() which shuts down the timerExecutor. This could cause the 
scheduled reconnectTask to throw RejectedExecutionException or attempt to use 
resources that are being shut down. Consider using shutdownNow() on the 
scheduled future or adding a small delay to allow in-flight tasks to complete.



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager 
getClientChannelManager() {
             return mockChannelManager != null ? mockChannelManager : 
super.getClientChannelManager();
         }
     }
+
+    @Test
+    public void testConstructorCoreInitialization() {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+            clientBootstrapField.setAccessible(true);
+            NettyClientBootstrap bootstrap = (NettyClientBootstrap) 
clientBootstrapField.get(testClient);
+            assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+            Field clientChannelManagerField =
+                    
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            clientChannelManagerField.setAccessible(true);
+            NettyClientChannelManager channelManager =
+                    (NettyClientChannelManager) 
clientChannelManagerField.get(testClient);
+            assertNotNull(channelManager, "fail to init 
NettyClientChannelManager");
+
+            Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+            reconnectTaskField.setAccessible(true);
+            Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(testClient);
+            assertNotNull(reconnectTask, "fail to init reconnectTask");
+        } catch (Exception e) {
+            fail("test failed:" + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInitReconnectTimerStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+        NettyClientBootstrap mockBootstrap = 
Mockito.mock(NettyClientBootstrap.class);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+        timerStartedField.setAccessible(true);
+        AtomicBoolean timerStarted = (AtomicBoolean) 
timerStartedField.get(testClient);
+        assertFalse(timerStarted.get(), "reconnect is not started");
+
+        testClient.init();
+
+        assertTrue(timerStarted.get(), "reconnect starts");
+
+        Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+        Field timerExecutorField = 
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+        timerExecutorField.setAccessible(true);
+        ScheduledExecutorService timerExecutor = (ScheduledExecutorService) 
timerExecutorField.get(testClient);
+        assertNotNull(timerExecutor, "thread can not be null");
+    }
+
+    @Test
+    public void testInitMergeSendExecutorService() throws Exception {
+        TestNettyRemotingClientWithBatch batchClient =
+                new TestNettyRemotingClientWithBatch(clientConfig, 
messageExecutor);
+        Field mergeSendExecutorField = 
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+        mergeSendExecutorField.setAccessible(true);
+        assertNull(mergeSendExecutorField.get(batchClient), "thread should be 
null");
+
+        batchClient.init();
+        ExecutorService mergeSendExecutor = (ExecutorService) 
mergeSendExecutorField.get(batchClient);
+        assertNotNull(mergeSendExecutor, "mergeSendExecutor should be 
initialized");
+        assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type 
of thread should be ThreadPoolExecutor");
+
+        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+        assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should 
be MAX_MERGE_SEND_THREAD");
+        assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool 
should be MAX_MERGE_SEND_THREAD");
+    }
+
+    @Test
+    public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+        class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+            public TestClientWithEmptyServiceGroup(NettyClientConfig config, 
ThreadPoolExecutor executor) {
+                super(config, executor);
+            }
+
+            @Override
+            protected String getTransactionServiceGroup() {
+                return "";
+            }
+        }
+
+        TestClientWithEmptyServiceGroup testClient = new 
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+        assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+    }
+
+    @Test
+    public void testReconnectTaskThrowException() throws Exception {
+        TestNettyRemotingClientWithReconnectException exceptionClient =
+                new 
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(exceptionClient);
+
+        assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
+    }
+
+    @Test
+    public void testInitClientBootstrapStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        testClient.init();
+        verify(mockBootstrap, times(1)).start();
+    }
+
+    static class TestReconnectTaskClient extends TestNettyRemotingClient {
+        private final String transactionServiceGroup;
+
+        public TestReconnectTaskClient(NettyClientConfig config, 
ThreadPoolExecutor executor, String serviceGroup) {
+            super(config, executor);
+            this.transactionServiceGroup = serviceGroup;
+        }
+
+        @Override
+        protected String getTransactionServiceGroup() {
+            return this.transactionServiceGroup;
+        }
+    }
+
+    @Test
+    public void testReconnectTask() throws Exception {
+        String testServiceGroup = "test-group";
+        NettyClientConfig clientConfig = new NettyClientConfig();
+        ThreadPoolExecutor messageExecutor =
+                new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>());
+        // Mock clientChannelManager
+        NettyClientChannelManager mockChannelManager = 
mock(NettyClientChannelManager.class);
+        Field transactionRoleField = 
AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
+        transactionRoleField.setAccessible(true);
+
+        TestReconnectTaskClient client1 = new 
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+        Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+        timerStartedField.setAccessible(true);
+        AtomicBoolean timerStarted1 = (AtomicBoolean) 
timerStartedField.get(client1);
+        timerStarted1.set(false);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1);
+
+        reconnectTask1.run();
+        verify(mockChannelManager, never()).reconnect(anyString());
+
+        TestReconnectTaskClient client2 = new 
TestReconnectTaskClient(clientConfig, messageExecutor, "");
+        AtomicBoolean timerStarted2 = (AtomicBoolean) 
timerStartedField.get(client2);
+        timerStarted2.set(true);
+        Field channelManagerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+        channelManagerField.setAccessible(true);
+        channelManagerField.set(client2, mockChannelManager);
+
+        // verify reconnect
+        Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2);
+        reconnectTask2.run();
+        verify(mockChannelManager, never()).reconnect(anyString());
+
+        TestReconnectTaskClient client3 = new 
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+        AtomicBoolean timerStarted3 = (AtomicBoolean) 
timerStartedField.get(client3);
+        timerStarted3.set(true);
+        channelManagerField.set(client3, mockChannelManager);
+
+        Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3);
+        reconnectTask3.run();
+        verify(mockChannelManager, times(1)).reconnect(testServiceGroup);
+
+        TestReconnectTaskClient client4 = new 
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+        AtomicBoolean timerStarted4 = (AtomicBoolean) 
timerStartedField.get(client4);
+        timerStarted4.set(true);
+        RuntimeException testEx = new RuntimeException("test-reconnect-error");
+        doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
+        channelManagerField.set(client4, mockChannelManager);
+
+        Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4);
+        assertDoesNotThrow(reconnectTask4::run);

Review Comment:
   The test creates multiple client instances (client1, client2, client3, 
client4) but doesn't properly clean them up. Each client has scheduled timers 
and potentially other resources that should be destroyed. Add proper cleanup in 
a finally block or use @AfterEach to ensure all test clients are destroyed.
   ```suggestion
           TestReconnectTaskClient client1 = null;
           TestReconnectTaskClient client2 = null;
           TestReconnectTaskClient client3 = null;
           TestReconnectTaskClient client4 = null;
           try {
               // Mock clientChannelManager
               NettyClientChannelManager mockChannelManager = 
mock(NettyClientChannelManager.class);
               Field transactionRoleField = 
AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
               transactionRoleField.setAccessible(true);
   
               client1 = new TestReconnectTaskClient(clientConfig, 
messageExecutor, testServiceGroup);
               Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
               timerStartedField.setAccessible(true);
               AtomicBoolean timerStarted1 = (AtomicBoolean) 
timerStartedField.get(client1);
               timerStarted1.set(false);
               Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
               reconnectTaskField.setAccessible(true);
               Runnable reconnectTask1 = (Runnable) 
reconnectTaskField.get(client1);
   
               reconnectTask1.run();
               verify(mockChannelManager, never()).reconnect(anyString());
   
               client2 = new TestReconnectTaskClient(clientConfig, 
messageExecutor, "");
               AtomicBoolean timerStarted2 = (AtomicBoolean) 
timerStartedField.get(client2);
               timerStarted2.set(true);
               Field channelManagerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
               channelManagerField.setAccessible(true);
               channelManagerField.set(client2, mockChannelManager);
   
               // verify reconnect
               Runnable reconnectTask2 = (Runnable) 
reconnectTaskField.get(client2);
               reconnectTask2.run();
               verify(mockChannelManager, never()).reconnect(anyString());
   
               client3 = new TestReconnectTaskClient(clientConfig, 
messageExecutor, testServiceGroup);
               AtomicBoolean timerStarted3 = (AtomicBoolean) 
timerStartedField.get(client3);
               timerStarted3.set(true);
               channelManagerField.set(client3, mockChannelManager);
   
               Runnable reconnectTask3 = (Runnable) 
reconnectTaskField.get(client3);
               reconnectTask3.run();
               verify(mockChannelManager, times(1)).reconnect(testServiceGroup);
   
               client4 = new TestReconnectTaskClient(clientConfig, 
messageExecutor, testServiceGroup);
               AtomicBoolean timerStarted4 = (AtomicBoolean) 
timerStartedField.get(client4);
               timerStarted4.set(true);
               RuntimeException testEx = new 
RuntimeException("test-reconnect-error");
               
doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
               channelManagerField.set(client4, mockChannelManager);
   
               Runnable reconnectTask4 = (Runnable) 
reconnectTaskField.get(client4);
               assertDoesNotThrow(reconnectTask4::run);
           } finally {
               if (client1 != null) {
                   client1.destroy();
               }
               if (client2 != null) {
                   client2.destroy();
               }
               if (client3 != null) {
                   client3.destroy();
               }
               if (client4 != null) {
                   client4.destroy();
               }
               messageExecutor.shutdownNow();
           }
   ```



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1479,6 +1492,96 @@ public void testMergedSendRunnableWithEmptyBasket() 
throws Exception {
         }
     }
 
+    @Test
+    public void testGetXidFromGlobalRollbackRequest() {
+        GlobalRollbackRequest request = new GlobalRollbackRequest();
+        request.setXid("rollback-xid-67890");
+        String xid = client.getXid(request);
+        assertEquals("rollback-xid-67890", xid);
+    }
+
+    @Test
+    public void testGetXidFromBranchReportRequest() {
+        BranchReportRequest request = new BranchReportRequest();
+        request.setXid("report-xid-54321");
+        String xid = client.getXid(request);
+        assertEquals("report-xid-54321", xid);
+    }
+
+    @Test
+    public void testReconnectTimerException() {
+        TestNettyRemotingClient client = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            // mock clientChannelManager.reconnect exception
+            NettyClientChannelManager mockManager = 
mock(NettyClientChannelManager.class);
+            doThrow(new RuntimeException("forced 
error")).when(mockManager).reconnect(ArgumentMatchers.anyString());
+            Field managerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            managerField.setAccessible(true);
+            managerField.set(client, mockManager);
+
+            client.init();
+            Thread.sleep(100);
+        } catch (Exception e) {
+            fail("Reconnect timer exception test failed: " + e.getMessage());
+        } finally {
+            client.destroy();
+        }
+    }
+
+    @Test
+    public void testGetXidWithoutXidField() {
+        class NoXidMessage extends AbstractMessage {
+            private String name;
+
+            public NoXidMessage(String name) {
+                this.name = name;
+            }
+
+            @Override
+            public short getTypeCode() {
+                return 0;
+            }
+        }
+
+        NoXidMessage msg = new NoXidMessage("test-no-xid");
+        String xid1 = client.getXid(msg);
+        String xid2 = client.getXid(msg);
+
+        assertNotNull(xid1);
+        assertNotNull(xid2);
+        Assertions.assertNotEquals(xid1, xid2);
+    }
+
+    @Test
+    public void testInitAndDestroyMultipleTimes() {
+        TestNettyRemotingClient multiClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+        try {
+            // initiate first
+            multiClient.init();
+
+            // verify status
+            assertNotNull(multiClient);

Review Comment:
   The assertion 'assertNotNull(multiClient)' on line 1564 is not meaningful 
since multiClient was just created on line 1557 and would never be null at this 
point. Consider replacing this with a more meaningful assertion that verifies 
the client is properly initialized, such as checking the timerStarted flag or 
verifying that the timer has been scheduled.
   ```suggestion
               // verify status: timer should be started after init
               try {
                   Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
                   timerStartedField.setAccessible(true);
                   AtomicBoolean timerStarted = (AtomicBoolean) 
timerStartedField.get(multiClient);
                   assertNotNull(timerStarted);
                   assertTrue(timerStarted.get());
               } catch (NoSuchFieldException | IllegalAccessException e) {
                   Assertions.fail("Failed to access timerStarted field on 
AbstractNettyRemotingClient", e);
               }
   ```



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager 
getClientChannelManager() {
             return mockChannelManager != null ? mockChannelManager : 
super.getClientChannelManager();
         }
     }
+
+    @Test
+    public void testConstructorCoreInitialization() {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+            clientBootstrapField.setAccessible(true);
+            NettyClientBootstrap bootstrap = (NettyClientBootstrap) 
clientBootstrapField.get(testClient);
+            assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+            Field clientChannelManagerField =
+                    
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            clientChannelManagerField.setAccessible(true);
+            NettyClientChannelManager channelManager =
+                    (NettyClientChannelManager) 
clientChannelManagerField.get(testClient);
+            assertNotNull(channelManager, "fail to init 
NettyClientChannelManager");
+
+            Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+            reconnectTaskField.setAccessible(true);
+            Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(testClient);
+            assertNotNull(reconnectTask, "fail to init reconnectTask");
+        } catch (Exception e) {
+            fail("test failed:" + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInitReconnectTimerStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+        NettyClientBootstrap mockBootstrap = 
Mockito.mock(NettyClientBootstrap.class);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+        timerStartedField.setAccessible(true);
+        AtomicBoolean timerStarted = (AtomicBoolean) 
timerStartedField.get(testClient);
+        assertFalse(timerStarted.get(), "reconnect is not started");
+
+        testClient.init();
+
+        assertTrue(timerStarted.get(), "reconnect starts");
+
+        Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+        Field timerExecutorField = 
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+        timerExecutorField.setAccessible(true);
+        ScheduledExecutorService timerExecutor = (ScheduledExecutorService) 
timerExecutorField.get(testClient);
+        assertNotNull(timerExecutor, "thread can not be null");
+    }
+
+    @Test
+    public void testInitMergeSendExecutorService() throws Exception {
+        TestNettyRemotingClientWithBatch batchClient =
+                new TestNettyRemotingClientWithBatch(clientConfig, 
messageExecutor);
+        Field mergeSendExecutorField = 
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+        mergeSendExecutorField.setAccessible(true);
+        assertNull(mergeSendExecutorField.get(batchClient), "thread should be 
null");
+
+        batchClient.init();
+        ExecutorService mergeSendExecutor = (ExecutorService) 
mergeSendExecutorField.get(batchClient);
+        assertNotNull(mergeSendExecutor, "mergeSendExecutor should be 
initialized");
+        assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type 
of thread should be ThreadPoolExecutor");
+
+        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+        assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should 
be MAX_MERGE_SEND_THREAD");
+        assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool 
should be MAX_MERGE_SEND_THREAD");
+    }
+
+    @Test
+    public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+        class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+            public TestClientWithEmptyServiceGroup(NettyClientConfig config, 
ThreadPoolExecutor executor) {
+                super(config, executor);
+            }
+
+            @Override
+            protected String getTransactionServiceGroup() {
+                return "";
+            }
+        }
+
+        TestClientWithEmptyServiceGroup testClient = new 
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+        assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+    }
+
+    @Test
+    public void testReconnectTaskThrowException() throws Exception {
+        TestNettyRemotingClientWithReconnectException exceptionClient =
+                new 
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(exceptionClient);
+
+        assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
+    }
+
+    @Test
+    public void testInitClientBootstrapStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        testClient.init();
+        verify(mockBootstrap, times(1)).start();

Review Comment:
   The testClient is initialized but not destroyed in a finally block. If an 
exception occurs during field access or assertions, the client's resources 
(timers, executors) won't be cleaned up properly. Add a finally block to ensure 
cleanup.
   ```suggestion
           try {
               Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
               clientBootstrapField.setAccessible(true);
               NettyClientBootstrap mockBootstrap = 
mock(NettyClientBootstrap.class);
               clientBootstrapField.set(testClient, mockBootstrap);
   
               testClient.init();
               verify(mockBootstrap, times(1)).start();
           } finally {
               testClient.destroy();
           }
   ```



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager 
getClientChannelManager() {
             return mockChannelManager != null ? mockChannelManager : 
super.getClientChannelManager();
         }
     }
+
+    @Test
+    public void testConstructorCoreInitialization() {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+            clientBootstrapField.setAccessible(true);
+            NettyClientBootstrap bootstrap = (NettyClientBootstrap) 
clientBootstrapField.get(testClient);
+            assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+            Field clientChannelManagerField =
+                    
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            clientChannelManagerField.setAccessible(true);
+            NettyClientChannelManager channelManager =
+                    (NettyClientChannelManager) 
clientChannelManagerField.get(testClient);
+            assertNotNull(channelManager, "fail to init 
NettyClientChannelManager");
+
+            Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+            reconnectTaskField.setAccessible(true);
+            Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(testClient);
+            assertNotNull(reconnectTask, "fail to init reconnectTask");
+        } catch (Exception e) {
+            fail("test failed:" + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInitReconnectTimerStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+        NettyClientBootstrap mockBootstrap = 
Mockito.mock(NettyClientBootstrap.class);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+        timerStartedField.setAccessible(true);
+        AtomicBoolean timerStarted = (AtomicBoolean) 
timerStartedField.get(testClient);
+        assertFalse(timerStarted.get(), "reconnect is not started");
+
+        testClient.init();
+
+        assertTrue(timerStarted.get(), "reconnect starts");
+
+        Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+        Field timerExecutorField = 
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+        timerExecutorField.setAccessible(true);
+        ScheduledExecutorService timerExecutor = (ScheduledExecutorService) 
timerExecutorField.get(testClient);
+        assertNotNull(timerExecutor, "thread can not be null");
+    }
+
+    @Test
+    public void testInitMergeSendExecutorService() throws Exception {
+        TestNettyRemotingClientWithBatch batchClient =
+                new TestNettyRemotingClientWithBatch(clientConfig, 
messageExecutor);
+        Field mergeSendExecutorField = 
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+        mergeSendExecutorField.setAccessible(true);
+        assertNull(mergeSendExecutorField.get(batchClient), "thread should be 
null");
+
+        batchClient.init();
+        ExecutorService mergeSendExecutor = (ExecutorService) 
mergeSendExecutorField.get(batchClient);
+        assertNotNull(mergeSendExecutor, "mergeSendExecutor should be 
initialized");
+        assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type 
of thread should be ThreadPoolExecutor");
+
+        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+        assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should 
be MAX_MERGE_SEND_THREAD");
+        assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool 
should be MAX_MERGE_SEND_THREAD");

Review Comment:
   The batchClient is initialized but never explicitly destroyed in a finally 
block, which could lead to resource leaks if the test fails. Consider adding a 
finally block to ensure proper cleanup of the client and its resources (timer 
executor, merge send executor, etc.).
   ```suggestion
           try {
               batchClient.init();
               ExecutorService mergeSendExecutor = (ExecutorService) 
mergeSendExecutorField.get(batchClient);
               assertNotNull(mergeSendExecutor, "mergeSendExecutor should be 
initialized");
               assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the 
type of thread should be ThreadPoolExecutor");
   
               ThreadPoolExecutor threadPool = (ThreadPoolExecutor) 
mergeSendExecutor;
               assertEquals(1, threadPool.getCorePoolSize(), "core threadPool 
should be MAX_MERGE_SEND_THREAD");
               assertEquals(1, threadPool.getMaximumPoolSize(), "the max 
threadPool should be MAX_MERGE_SEND_THREAD");
           } finally {
               batchClient.destroy();
           }
   ```



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager 
getClientChannelManager() {
             return mockChannelManager != null ? mockChannelManager : 
super.getClientChannelManager();
         }
     }
+
+    @Test
+    public void testConstructorCoreInitialization() {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+            clientBootstrapField.setAccessible(true);
+            NettyClientBootstrap bootstrap = (NettyClientBootstrap) 
clientBootstrapField.get(testClient);
+            assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+            Field clientChannelManagerField =
+                    
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            clientChannelManagerField.setAccessible(true);
+            NettyClientChannelManager channelManager =
+                    (NettyClientChannelManager) 
clientChannelManagerField.get(testClient);
+            assertNotNull(channelManager, "fail to init 
NettyClientChannelManager");
+
+            Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+            reconnectTaskField.setAccessible(true);
+            Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(testClient);
+            assertNotNull(reconnectTask, "fail to init reconnectTask");
+        } catch (Exception e) {
+            fail("test failed:" + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInitReconnectTimerStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+        NettyClientBootstrap mockBootstrap = 
Mockito.mock(NettyClientBootstrap.class);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+        timerStartedField.setAccessible(true);
+        AtomicBoolean timerStarted = (AtomicBoolean) 
timerStartedField.get(testClient);
+        assertFalse(timerStarted.get(), "reconnect is not started");
+
+        testClient.init();
+
+        assertTrue(timerStarted.get(), "reconnect starts");
+
+        Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+        Field timerExecutorField = 
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+        timerExecutorField.setAccessible(true);
+        ScheduledExecutorService timerExecutor = (ScheduledExecutorService) 
timerExecutorField.get(testClient);
+        assertNotNull(timerExecutor, "thread can not be null");

Review Comment:
   Similar to the previous test, testClient is initialized but never destroyed, 
potentially leaking resources. Additionally, the test doesn't clean up the mock 
bootstrap that was injected. Consider adding proper cleanup in a finally block.
   ```suggestion
           } finally {
               if (testClient != null) {
                   testClient.destroy();
               }
           }
       }
   
       @Test
       public void testInitReconnectTimerStart() throws Exception {
           TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
           NettyClientBootstrap originalBootstrap = null;
           try {
               NettyClientBootstrap mockBootstrap = 
Mockito.mock(NettyClientBootstrap.class);
               Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
               clientBootstrapField.setAccessible(true);
               originalBootstrap = (NettyClientBootstrap) 
clientBootstrapField.get(testClient);
               clientBootstrapField.set(testClient, mockBootstrap);
   
               Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
               timerStartedField.setAccessible(true);
               AtomicBoolean timerStarted = (AtomicBoolean) 
timerStartedField.get(testClient);
               assertFalse(timerStarted.get(), "reconnect is not started");
   
               testClient.init();
   
               assertTrue(timerStarted.get(), "reconnect starts");
   
               Mockito.verify(mockBootstrap, Mockito.times(1)).start();
   
               Field timerExecutorField = 
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
               timerExecutorField.setAccessible(true);
               ScheduledExecutorService timerExecutor = 
(ScheduledExecutorService) timerExecutorField.get(testClient);
               assertNotNull(timerExecutor, "thread can not be null");
           } finally {
               try {
                   Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
                   clientBootstrapField.setAccessible(true);
                   clientBootstrapField.set(testClient, originalBootstrap);
               } catch (Exception ignore) {
                   // ignore cleanup exceptions in test
               }
               if (testClient != null) {
                   testClient.destroy();
               }
           }
   ```



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager 
getClientChannelManager() {
             return mockChannelManager != null ? mockChannelManager : 
super.getClientChannelManager();
         }
     }
+
+    @Test
+    public void testConstructorCoreInitialization() {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+            clientBootstrapField.setAccessible(true);
+            NettyClientBootstrap bootstrap = (NettyClientBootstrap) 
clientBootstrapField.get(testClient);
+            assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+            Field clientChannelManagerField =
+                    
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            clientChannelManagerField.setAccessible(true);
+            NettyClientChannelManager channelManager =
+                    (NettyClientChannelManager) 
clientChannelManagerField.get(testClient);
+            assertNotNull(channelManager, "fail to init 
NettyClientChannelManager");
+
+            Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+            reconnectTaskField.setAccessible(true);
+            Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(testClient);
+            assertNotNull(reconnectTask, "fail to init reconnectTask");
+        } catch (Exception e) {
+            fail("test failed:" + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInitReconnectTimerStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+        NettyClientBootstrap mockBootstrap = 
Mockito.mock(NettyClientBootstrap.class);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+        timerStartedField.setAccessible(true);
+        AtomicBoolean timerStarted = (AtomicBoolean) 
timerStartedField.get(testClient);
+        assertFalse(timerStarted.get(), "reconnect is not started");
+
+        testClient.init();
+
+        assertTrue(timerStarted.get(), "reconnect starts");
+
+        Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+        Field timerExecutorField = 
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+        timerExecutorField.setAccessible(true);
+        ScheduledExecutorService timerExecutor = (ScheduledExecutorService) 
timerExecutorField.get(testClient);
+        assertNotNull(timerExecutor, "thread can not be null");
+    }
+
+    @Test
+    public void testInitMergeSendExecutorService() throws Exception {
+        TestNettyRemotingClientWithBatch batchClient =
+                new TestNettyRemotingClientWithBatch(clientConfig, 
messageExecutor);
+        Field mergeSendExecutorField = 
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+        mergeSendExecutorField.setAccessible(true);
+        assertNull(mergeSendExecutorField.get(batchClient), "thread should be 
null");
+
+        batchClient.init();
+        ExecutorService mergeSendExecutor = (ExecutorService) 
mergeSendExecutorField.get(batchClient);
+        assertNotNull(mergeSendExecutor, "mergeSendExecutor should be 
initialized");
+        assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type 
of thread should be ThreadPoolExecutor");
+
+        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+        assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should 
be MAX_MERGE_SEND_THREAD");
+        assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool 
should be MAX_MERGE_SEND_THREAD");
+    }
+
+    @Test
+    public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+        class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+            public TestClientWithEmptyServiceGroup(NettyClientConfig config, 
ThreadPoolExecutor executor) {
+                super(config, executor);
+            }
+
+            @Override
+            protected String getTransactionServiceGroup() {
+                return "";
+            }
+        }
+
+        TestClientWithEmptyServiceGroup testClient = new 
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+        assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+    }
+
+    @Test
+    public void testReconnectTaskThrowException() throws Exception {
+        TestNettyRemotingClientWithReconnectException exceptionClient =
+                new 
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(exceptionClient);
+
+        assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");

Review Comment:
   The exceptionClient is created but never destroyed. Add proper cleanup in a 
finally block to ensure resources are released even if the test fails.
   ```suggestion
           TestNettyRemotingClientWithReconnectException exceptionClient = null;
           try {
               exceptionClient = new 
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
               Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
               reconnectTaskField.setAccessible(true);
               Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(exceptionClient);
   
               assertDoesNotThrow(reconnectTask::run, "reconnectTask 
exception");
           } finally {
               if (exceptionClient != null) {
                   exceptionClient.destroy();
               }
           }
   ```



##########
core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java:
##########
@@ -120,29 +125,30 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
 
     @Override
     public void init() {
-        timerExecutor.scheduleAtFixedRate(
-                () -> {
-                    try {
-                        
clientChannelManager.reconnect(getTransactionServiceGroup());
-                    } catch (Exception ex) {
-                        LOGGER.warn("reconnect server failed. {}", 
ex.getMessage());
-                    }
-                },
-                SCHEDULE_DELAY_MILLS,
-                SCHEDULE_INTERVAL_MILLS,
-                TimeUnit.MILLISECONDS);
-        if (this.isEnableClientBatchSendRequest()) {
-            mergeSendExecutorService = new ThreadPoolExecutor(
-                    MAX_MERGE_SEND_THREAD,
-                    MAX_MERGE_SEND_THREAD,
-                    KEEP_ALIVE_TIME,
-                    TimeUnit.MILLISECONDS,
-                    new LinkedBlockingQueue<>(),
-                    new NamedThreadFactory(getThreadPrefix(), 
MAX_MERGE_SEND_THREAD));
-            mergeSendExecutorService.submit(new MergedSendRunnable());
+        reconnectLock.lock();
+        try {
+            if (timerStarted.compareAndSet(false, true)) {
+                timerExecutor.scheduleAtFixedRate(
+                        reconnectTask, SCHEDULE_DELAY_MILLS, 
SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
+                LOGGER.info("Reconnect timer started (role: {})", 
transactionRole.name());
+            }
+
+            if (this.isEnableClientBatchSendRequest()) {
+                mergeSendExecutorService = new ThreadPoolExecutor(
+                        MAX_MERGE_SEND_THREAD,
+                        MAX_MERGE_SEND_THREAD,
+                        KEEP_ALIVE_TIME,
+                        TimeUnit.MILLISECONDS,
+                        new LinkedBlockingQueue<>(),
+                        new NamedThreadFactory(getThreadPrefix(), 
MAX_MERGE_SEND_THREAD));
+                mergeSendExecutorService.submit(new MergedSendRunnable());
+            }

Review Comment:
   The mergeSendExecutorService can be initialized multiple times if init() is 
called multiple times. While the timerStarted flag prevents duplicate timer 
scheduling, there's no similar protection for mergeSendExecutorService. This 
could lead to thread pool leaks if init() is called multiple times without 
calling destroy() in between. Consider adding a check to prevent 
re-initialization of the executor service, similar to the timer protection.



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1479,6 +1492,96 @@ public void testMergedSendRunnableWithEmptyBasket() 
throws Exception {
         }
     }
 
+    @Test
+    public void testGetXidFromGlobalRollbackRequest() {
+        GlobalRollbackRequest request = new GlobalRollbackRequest();
+        request.setXid("rollback-xid-67890");
+        String xid = client.getXid(request);
+        assertEquals("rollback-xid-67890", xid);
+    }
+
+    @Test
+    public void testGetXidFromBranchReportRequest() {
+        BranchReportRequest request = new BranchReportRequest();
+        request.setXid("report-xid-54321");
+        String xid = client.getXid(request);
+        assertEquals("report-xid-54321", xid);
+    }
+
+    @Test
+    public void testReconnectTimerException() {
+        TestNettyRemotingClient client = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            // mock clientChannelManager.reconnect exception
+            NettyClientChannelManager mockManager = 
mock(NettyClientChannelManager.class);
+            doThrow(new RuntimeException("forced 
error")).when(mockManager).reconnect(ArgumentMatchers.anyString());
+            Field managerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            managerField.setAccessible(true);
+            managerField.set(client, mockManager);
+
+            client.init();
+            Thread.sleep(100);
+        } catch (Exception e) {
+            fail("Reconnect timer exception test failed: " + e.getMessage());
+        } finally {
+            client.destroy();
+        }
+    }
+
+    @Test
+    public void testGetXidWithoutXidField() {
+        class NoXidMessage extends AbstractMessage {
+            private String name;
+
+            public NoXidMessage(String name) {
+                this.name = name;
+            }
+
+            @Override
+            public short getTypeCode() {
+                return 0;
+            }
+        }
+
+        NoXidMessage msg = new NoXidMessage("test-no-xid");
+        String xid1 = client.getXid(msg);
+        String xid2 = client.getXid(msg);
+
+        assertNotNull(xid1);
+        assertNotNull(xid2);
+        Assertions.assertNotEquals(xid1, xid2);
+    }
+
+    @Test
+    public void testInitAndDestroyMultipleTimes() {
+        TestNettyRemotingClient multiClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+        try {
+            // initiate first
+            multiClient.init();
+
+            // verify status
+            assertNotNull(multiClient);
+
+            multiClient.destroy();
+            multiClient.destroy();
+
+        } finally {
+            // clean
+            try {
+                multiClient.destroy();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+    }

Review Comment:
   The test verifies that calling destroy() multiple times doesn't cause 
issues, but it doesn't verify that subsequent calls to init() after destroy() 
work correctly. This is important since the timerStarted flag might prevent 
re-initialization. Consider adding a test that calls init(), destroy(), init() 
sequence to ensure the client can be reinitialized properly.



##########
core/src/test/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClientTest.java:
##########
@@ -1559,4 +1662,186 @@ public NettyClientChannelManager 
getClientChannelManager() {
             return mockChannelManager != null ? mockChannelManager : 
super.getClientChannelManager();
         }
     }
+
+    @Test
+    public void testConstructorCoreInitialization() {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        try {
+            Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+            clientBootstrapField.setAccessible(true);
+            NettyClientBootstrap bootstrap = (NettyClientBootstrap) 
clientBootstrapField.get(testClient);
+            assertNotNull(bootstrap, "fail to init NettyClientBootstrap");
+
+            Field clientChannelManagerField =
+                    
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+            clientChannelManagerField.setAccessible(true);
+            NettyClientChannelManager channelManager =
+                    (NettyClientChannelManager) 
clientChannelManagerField.get(testClient);
+            assertNotNull(channelManager, "fail to init 
NettyClientChannelManager");
+
+            Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+            reconnectTaskField.setAccessible(true);
+            Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(testClient);
+            assertNotNull(reconnectTask, "fail to init reconnectTask");
+        } catch (Exception e) {
+            fail("test failed:" + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testInitReconnectTimerStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+
+        NettyClientBootstrap mockBootstrap = 
Mockito.mock(NettyClientBootstrap.class);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+        timerStartedField.setAccessible(true);
+        AtomicBoolean timerStarted = (AtomicBoolean) 
timerStartedField.get(testClient);
+        assertFalse(timerStarted.get(), "reconnect is not started");
+
+        testClient.init();
+
+        assertTrue(timerStarted.get(), "reconnect starts");
+
+        Mockito.verify(mockBootstrap, Mockito.times(1)).start();
+
+        Field timerExecutorField = 
AbstractNettyRemoting.class.getDeclaredField("timerExecutor");
+        timerExecutorField.setAccessible(true);
+        ScheduledExecutorService timerExecutor = (ScheduledExecutorService) 
timerExecutorField.get(testClient);
+        assertNotNull(timerExecutor, "thread can not be null");
+    }
+
+    @Test
+    public void testInitMergeSendExecutorService() throws Exception {
+        TestNettyRemotingClientWithBatch batchClient =
+                new TestNettyRemotingClientWithBatch(clientConfig, 
messageExecutor);
+        Field mergeSendExecutorField = 
AbstractNettyRemotingClient.class.getDeclaredField("mergeSendExecutorService");
+        mergeSendExecutorField.setAccessible(true);
+        assertNull(mergeSendExecutorField.get(batchClient), "thread should be 
null");
+
+        batchClient.init();
+        ExecutorService mergeSendExecutor = (ExecutorService) 
mergeSendExecutorField.get(batchClient);
+        assertNotNull(mergeSendExecutor, "mergeSendExecutor should be 
initialized");
+        assertTrue(mergeSendExecutor instanceof ThreadPoolExecutor, "the type 
of thread should be ThreadPoolExecutor");
+
+        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) mergeSendExecutor;
+        assertEquals(1, threadPool.getCorePoolSize(), "core threadPool should 
be MAX_MERGE_SEND_THREAD");
+        assertEquals(1, threadPool.getMaximumPoolSize(), "the max threadPool 
should be MAX_MERGE_SEND_THREAD");
+    }
+
+    @Test
+    public void testReconnectTaskWithEmptyServiceGroup() throws Exception {
+        class TestClientWithEmptyServiceGroup extends TestNettyRemotingClient {
+            public TestClientWithEmptyServiceGroup(NettyClientConfig config, 
ThreadPoolExecutor executor) {
+                super(config, executor);
+            }
+
+            @Override
+            protected String getTransactionServiceGroup() {
+                return "";
+            }
+        }
+
+        TestClientWithEmptyServiceGroup testClient = new 
TestClientWithEmptyServiceGroup(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) reconnectTaskField.get(testClient);
+
+        assertDoesNotThrow(reconnectTask::run, "serviceGroup is null");
+    }
+
+    @Test
+    public void testReconnectTaskThrowException() throws Exception {
+        TestNettyRemotingClientWithReconnectException exceptionClient =
+                new 
TestNettyRemotingClientWithReconnectException(clientConfig, messageExecutor);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask = (Runnable) 
reconnectTaskField.get(exceptionClient);
+
+        assertDoesNotThrow(reconnectTask::run, "reconnectTask exception");
+    }
+
+    @Test
+    public void testInitClientBootstrapStart() throws Exception {
+        TestNettyRemotingClient testClient = new 
TestNettyRemotingClient(clientConfig, messageExecutor);
+        Field clientBootstrapField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
+        clientBootstrapField.setAccessible(true);
+        NettyClientBootstrap mockBootstrap = mock(NettyClientBootstrap.class);
+        clientBootstrapField.set(testClient, mockBootstrap);
+
+        testClient.init();
+        verify(mockBootstrap, times(1)).start();
+    }
+
+    static class TestReconnectTaskClient extends TestNettyRemotingClient {
+        private final String transactionServiceGroup;
+
+        public TestReconnectTaskClient(NettyClientConfig config, 
ThreadPoolExecutor executor, String serviceGroup) {
+            super(config, executor);
+            this.transactionServiceGroup = serviceGroup;
+        }
+
+        @Override
+        protected String getTransactionServiceGroup() {
+            return this.transactionServiceGroup;
+        }
+    }
+
+    @Test
+    public void testReconnectTask() throws Exception {
+        String testServiceGroup = "test-group";
+        NettyClientConfig clientConfig = new NettyClientConfig();
+        ThreadPoolExecutor messageExecutor =
+                new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>());
+        // Mock clientChannelManager
+        NettyClientChannelManager mockChannelManager = 
mock(NettyClientChannelManager.class);
+        Field transactionRoleField = 
AbstractNettyRemotingClient.class.getDeclaredField("transactionRole");
+        transactionRoleField.setAccessible(true);
+
+        TestReconnectTaskClient client1 = new 
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+        Field timerStartedField = 
AbstractNettyRemotingClient.class.getDeclaredField("timerStarted");
+        timerStartedField.setAccessible(true);
+        AtomicBoolean timerStarted1 = (AtomicBoolean) 
timerStartedField.get(client1);
+        timerStarted1.set(false);
+        Field reconnectTaskField = 
AbstractNettyRemotingClient.class.getDeclaredField("reconnectTask");
+        reconnectTaskField.setAccessible(true);
+        Runnable reconnectTask1 = (Runnable) reconnectTaskField.get(client1);
+
+        reconnectTask1.run();
+        verify(mockChannelManager, never()).reconnect(anyString());
+
+        TestReconnectTaskClient client2 = new 
TestReconnectTaskClient(clientConfig, messageExecutor, "");
+        AtomicBoolean timerStarted2 = (AtomicBoolean) 
timerStartedField.get(client2);
+        timerStarted2.set(true);
+        Field channelManagerField = 
AbstractNettyRemotingClient.class.getDeclaredField("clientChannelManager");
+        channelManagerField.setAccessible(true);
+        channelManagerField.set(client2, mockChannelManager);
+
+        // verify reconnect
+        Runnable reconnectTask2 = (Runnable) reconnectTaskField.get(client2);
+        reconnectTask2.run();
+        verify(mockChannelManager, never()).reconnect(anyString());
+
+        TestReconnectTaskClient client3 = new 
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+        AtomicBoolean timerStarted3 = (AtomicBoolean) 
timerStartedField.get(client3);
+        timerStarted3.set(true);
+        channelManagerField.set(client3, mockChannelManager);
+
+        Runnable reconnectTask3 = (Runnable) reconnectTaskField.get(client3);
+        reconnectTask3.run();
+        verify(mockChannelManager, times(1)).reconnect(testServiceGroup);
+
+        TestReconnectTaskClient client4 = new 
TestReconnectTaskClient(clientConfig, messageExecutor, testServiceGroup);
+        AtomicBoolean timerStarted4 = (AtomicBoolean) 
timerStartedField.get(client4);
+        timerStarted4.set(true);
+        RuntimeException testEx = new RuntimeException("test-reconnect-error");
+        doThrow(testEx).when(mockChannelManager).reconnect(testServiceGroup);
+        channelManagerField.set(client4, mockChannelManager);
+
+        Runnable reconnectTask4 = (Runnable) reconnectTaskField.get(client4);
+        assertDoesNotThrow(reconnectTask4::run);
+    }

Review Comment:
   This test creates four separate client instances but uses the same 
mockChannelManager across all of them. This means the verify() calls are 
checking the cumulative behavior across all clients, not the individual client 
behavior. Each client should have its own mock instance to properly isolate the 
test cases and verify the expected behavior for each scenario independently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to