This is an automated email from the ASF dual-hosted git repository. xingfudeshi pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new 7396e532ec test: add some UT cases for client processor (#7199) 7396e532ec is described below commit 7396e532ecb0905f7003f926e41e5df98c1ef9bc Author: jimin <sliev...@163.com> AuthorDate: Mon Mar 10 11:19:47 2025 +0800 test: add some UT cases for client processor (#7199) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 1 + .../org/apache/seata/core/rpc/RpcContextTest.java | 130 ++++++++++-- .../seata/core/rpc/hook/StatusRpcHookTest.java | 57 ++++++ .../client/ClientHeartbeatProcessorTest.java | 125 ++++++++++++ .../client/ClientOnResponseProcessorTest.java | 223 +++++++++++++++++++++ .../client/RmBranchCommitProcessorTest.java | 164 +++++++++++++++ .../client/RmBranchRollbackProcessorTest.java | 170 ++++++++++++++++ .../processor/client/RmUndoLogProcessorTest.java | 89 ++++++++ 9 files changed, 943 insertions(+), 17 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 5d5da91640..605b1a04ed 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -68,6 +68,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#7160](https://github.com/apache/incubator-seata/pull/7160)] Refactored tests in `LowerCaseLinkHashMapTest` to use parameterized unit testing - [[#7167](https://github.com/apache/incubator-seata/pull/7167)] Refactored tests in `DurationUtilTest` to simplify and use parameterized unit testing - [[#7189](https://github.com/apache/incubator-seata/pull/7189)] fix the runtime exception in the saga test case +- [[#7199](https://github.com/apache/incubator-seata/pull/7199)] add some UT cases for client processor ### refactor: diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index bd0c00be4e..8ed3e179a8 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -68,6 +68,7 @@ - [[#7160](https://github.com/apache/incubator-seata/pull/7160)] 在 LowerCaseLinkHashMapTest 中重构测试,以使用参数化单元测试 - [[#7167](https://github.com/apache/incubator-seata/pull/7167)] 重构了 DurationUtilTest 中的测试,以简化并使用参数化单元测试 - [[#7189](https://github.com/apache/incubator-seata/pull/7189)] 修复saga测试用例运行异常 +- [[#7199](https://github.com/apache/incubator-seata/pull/7199)] 增加 client processor 单测用例 ### refactor: diff --git a/core/src/test/java/org/apache/seata/core/rpc/RpcContextTest.java b/core/src/test/java/org/apache/seata/core/rpc/RpcContextTest.java index fdcc420c80..0e0e46709c 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/RpcContextTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/RpcContextTest.java @@ -16,10 +16,24 @@ */ package org.apache.seata.core.rpc; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import java.net.InetSocketAddress; import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import io.netty.channel.Channel; +import org.apache.seata.core.rpc.netty.NettyPoolKey; +import org.apache.seata.core.rpc.netty.NettyPoolKey.TransactionRole; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * RpcContext Test @@ -46,8 +60,8 @@ public class RpcContextTest { * RpcContext Constructor */ - @BeforeAll - public static void setup() { + @BeforeEach + public void setup() { rpcContext = new RpcContext(); } @@ -57,7 +71,7 @@ public class RpcContextTest { @Test public void testApplicationIdValue() { rpcContext.setApplicationId(ID); - Assertions.assertEquals(ID, rpcContext.getApplicationId()); + assertEquals(ID, rpcContext.getApplicationId()); } /** @@ -66,7 +80,7 @@ public class RpcContextTest { @Test public void testVersionValue() { rpcContext.setVersion(VERSION); - Assertions.assertEquals(VERSION, rpcContext.getVersion()); + assertEquals(VERSION, rpcContext.getVersion()); } /** @@ -75,7 +89,14 @@ public class RpcContextTest { @Test public void testClientIdValue() { rpcContext.setClientId(ID); - Assertions.assertEquals(ID, rpcContext.getClientId()); + assertEquals(ID, rpcContext.getClientId()); + } + + @Test + void testSetAndGetTransactionServiceGroup() { + String serviceGroup = "testGroup"; + rpcContext.setTransactionServiceGroup(serviceGroup); + assertEquals(serviceGroup, rpcContext.getTransactionServiceGroup(), "Transaction service group should match"); } /** @@ -84,7 +105,7 @@ public class RpcContextTest { @Test public void testChannelNull() { rpcContext.setChannel(null); - Assertions.assertNull(rpcContext.getChannel()); + assertNull(rpcContext.getChannel()); } /** @@ -94,7 +115,30 @@ public class RpcContextTest { @Test public void testTransactionServiceGroupValue() { rpcContext.setTransactionServiceGroup(TSG); - Assertions.assertEquals(TSG, rpcContext.getTransactionServiceGroup()); + assertEquals(TSG, rpcContext.getTransactionServiceGroup()); + } + + @Test + void testSetAndGetChannel() { + Channel mockChannel = Mockito.mock(Channel.class); + rpcContext.setChannel(mockChannel); + assertSame(mockChannel, rpcContext.getChannel(), "Channel should match"); + } + + @Test + void testSetAndGetClientRole() { + NettyPoolKey.TransactionRole role = NettyPoolKey.TransactionRole.TMROLE; + rpcContext.setClientRole(role); + assertEquals(role, rpcContext.getClientRole(), "Client role should match"); + } + + @Test + void testAddResource() { + String resource = "db1"; + rpcContext.addResource(resource); + Set<String> resources = rpcContext.getResourceSets(); + assertNotNull(resources, "Resource set should not be null"); + assertTrue(resources.contains(resource), "Resource should be added"); } /** @@ -103,7 +147,7 @@ public class RpcContextTest { @Test public void testClientRoleNull() { rpcContext.setClientRole(null); - Assertions.assertNull(rpcContext.getClientRole()); + assertNull(rpcContext.getClientRole()); } /** @@ -112,7 +156,7 @@ public class RpcContextTest { @Test public void testResourceSetsNull() { rpcContext.setResourceSets(null); - Assertions.assertNull(rpcContext.getResourceSets()); + assertNull(rpcContext.getResourceSets()); } /** @@ -123,7 +167,7 @@ public class RpcContextTest { HashSet<String> resourceSet = new HashSet<String>(); rpcContext.setResourceSets(resourceSet); rpcContext.addResource(null); - Assertions.assertEquals(0, rpcContext.getResourceSets().size()); + assertEquals(0, rpcContext.getResourceSets().size()); } /** @@ -134,7 +178,7 @@ public class RpcContextTest { public void testAddResourcesNull() { rpcContext.addResources(null); rpcContext.setResourceSets(null); - Assertions.assertNull(rpcContext.getResourceSets()); + assertNull(rpcContext.getResourceSets()); } /** @@ -145,7 +189,7 @@ public class RpcContextTest { HashSet<String> resourceSet = new HashSet<String>(); resourceSet.add(RV); rpcContext.addResources(resourceSet); - Assertions.assertEquals(resourceSet, rpcContext.getResourceSets()); + assertEquals(resourceSet, rpcContext.getResourceSets()); } /** @@ -161,7 +205,7 @@ public class RpcContextTest { rpcContext.addResources(resourceSet); rpcContext.setResourceSets(resourceSets); rpcContext.addResources(resourceSet); - Assertions.assertEquals(resourceSets, rpcContext.getResourceSets()); + assertEquals(resourceSets, rpcContext.getResourceSets()); } /** @@ -174,11 +218,63 @@ public class RpcContextTest { rpcContext.setClientId(null); rpcContext.setChannel(null); rpcContext.setResourceSets(null); - Assertions.assertEquals( + assertEquals( "RpcContext{" + "applicationId='" + rpcContext.getApplicationId() + '\'' + ", transactionServiceGroup='" + rpcContext.getTransactionServiceGroup() + '\'' + ", clientId='" + rpcContext.getClientId() + '\'' + ", channel=" + rpcContext.getChannel() + ", resourceSets=" + rpcContext.getResourceSets() + '}', rpcContext.toString()); } + @Test + void testHoldInIdentifiedChannels() { + ConcurrentMap<Channel, RpcContext> clientIDHolderMap = new ConcurrentHashMap<>(); + Channel mockChannel = Mockito.mock(Channel.class); + rpcContext.setChannel(mockChannel); + + rpcContext.holdInIdentifiedChannels(clientIDHolderMap); + assertSame(rpcContext, clientIDHolderMap.get(mockChannel), "RpcContext should be held in the map"); + } + + @Test + void testHoldInClientChannels() { + ConcurrentMap<Integer, RpcContext> clientTMHolderMap = new ConcurrentHashMap<>(); + Channel mockChannel = Mockito.mock(Channel.class); + rpcContext.setChannel(mockChannel); + Mockito.when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(8080)); + + rpcContext.holdInClientChannels(clientTMHolderMap); + Integer clientPort = 8080; // Assuming port is extracted from remote address + assertSame(rpcContext, clientTMHolderMap.get(clientPort), "RpcContext should be held in the map"); + } + + @Test + void testHoldInResourceManagerChannels() { + String resourceId = "db1"; + Integer clientPort = 8080; + + rpcContext.holdInResourceManagerChannels(resourceId, clientPort); + ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> clientRMHolderMap = rpcContext.getClientRMHolderMap(); + assertNotNull(clientRMHolderMap, "Client RM holder map should not be null"); + + ConcurrentMap<Integer, RpcContext> portMap = clientRMHolderMap.get(resourceId); + assertNotNull(portMap, "Port map should not be null"); + assertSame(rpcContext, portMap.get(clientPort), "RpcContext should be held in the map"); + } + + @Test + void testRelease() { + Channel mockChannel = Mockito.mock(Channel.class); + rpcContext.setChannel(mockChannel); + Mockito.when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(8080)); + + // Setup data + rpcContext.setClientRole(TransactionRole.RMROLE); + ConcurrentMap<Integer, RpcContext> clientTMHolderMap = new ConcurrentHashMap<>(); + rpcContext.holdInClientChannels(clientTMHolderMap); + + rpcContext.release(); + assertNull(rpcContext.getClientRMHolderMap(), "Client RM holder map should be cleared"); + assertNull(rpcContext.getResourceSets(), "Resource sets should be cleared"); + } + } diff --git a/core/src/test/java/org/apache/seata/core/rpc/hook/StatusRpcHookTest.java b/core/src/test/java/org/apache/seata/core/rpc/hook/StatusRpcHookTest.java new file mode 100644 index 0000000000..525eae52d8 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/hook/StatusRpcHookTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.hook; + +import org.apache.seata.common.rpc.RpcStatus; +import org.apache.seata.core.protocol.RpcMessage; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class StatusRpcHookTest { + + @Test + void testDoBeforeRequest() { + String service = "192.168.1.1:8080"; + RpcStatus status = RpcStatus.getStatus(service); + assertNotNull(status, "RpcStatus should not be null"); + + StatusRpcHook hook = new StatusRpcHook(); + hook.doBeforeRequest("192.168.1.1:8080", new RpcMessage()); + + assertEquals(1, status.getActive(), "Active count should be incremented"); + + } + + @Test + void testDoAfterResponse() { + String service = "192.168.2.1:8080"; + RpcStatus status = RpcStatus.getStatus(service); + assertNotNull(status, "RpcStatus should not be null"); + + StatusRpcHook hook = new StatusRpcHook(); + hook.doBeforeRequest("192.168.2.1:8080", new RpcMessage()); + + assertEquals(1, status.getActive(), "Active count should be incremented"); + + hook.doAfterResponse("192.168.2.1:8080", new RpcMessage(),null); + + assertEquals(0, status.getActive(), "Active count should be decremented"); + assertEquals(1, status.getTotal(), "Active count should be incremented"); + } +} diff --git a/core/src/test/java/org/apache/seata/core/rpc/processor/client/ClientHeartbeatProcessorTest.java b/core/src/test/java/org/apache/seata/core/rpc/processor/client/ClientHeartbeatProcessorTest.java new file mode 100644 index 0000000000..3b40e69fc4 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/processor/client/ClientHeartbeatProcessorTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.processor.client; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.seata.core.protocol.HeartbeatMessage; +import org.apache.seata.core.protocol.RpcMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.mockito.ArgumentMatchers; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * The type Client heartbeat processor test. + */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class ClientHeartbeatProcessorTest { + private ClientHeartbeatProcessor processor; + private ChannelHandlerContext mockCtx; + private RpcMessage mockRpcMessage; + private Logger mockLogger; + private MockedStatic<LoggerFactory> mockedLoggerFactory; + + /** + * Sets up. + */ + @BeforeEach + void setUp() { + mockCtx = mock(ChannelHandlerContext.class); + mockRpcMessage = mock(RpcMessage.class); + mockLogger = mock(Logger.class); + + // Mock static LoggerFactory to control LOGGER behavior + mockedLoggerFactory = Mockito.mockStatic(LoggerFactory.class); + mockedLoggerFactory.when(() -> LoggerFactory.getLogger(ClientHeartbeatProcessor.class)).thenReturn(mockLogger); + processor = new ClientHeartbeatProcessor(); + } + + /** + * Process should log debug when receive pong message and debug enabled. + * + * @throws Exception the exception + */ + @Test + @Order(1) + void process_ShouldLogDebug_WhenReceivePongMessageAndDebugEnabled() throws Exception { + // Arrange + Channel mockChannel = mock(Channel.class); + when(mockCtx.channel()).thenReturn(mockChannel); + + SocketAddress mockRemoteAddress = new InetSocketAddress("127.0.0.1", 8080); + when(mockChannel.remoteAddress()).thenReturn(mockRemoteAddress); + + when(mockRpcMessage.getBody()).thenReturn(HeartbeatMessage.PONG); + when(mockLogger.isDebugEnabled()).thenReturn(true); + assertTrue(LoggerFactory.getLogger(ClientHeartbeatProcessor.class).isDebugEnabled()); + + // Act + processor.process(mockCtx, mockRpcMessage); + + // Assert + verify(mockLogger).debug("received PONG from {}", mockRemoteAddress); + } + + /** + * Process should not log when receive non pong message. + * + * @throws Exception the exception + */ + @Test + @Order(2) + void process_ShouldNotLog_WhenReceiveNonPongMessage() throws Exception { + // Arrange + when(mockRpcMessage.getBody()).thenReturn("OTHER_MESSAGE"); + when(mockLogger.isDebugEnabled()).thenReturn(true); + + // Act + processor.process(mockCtx, mockRpcMessage); + + // Assert + verify(mockLogger, never()).debug(anyString(), ArgumentMatchers.<Object[]>any()); + } + + /** + * Tear down. + */ + @AfterEach + void tearDown() { + if (mockedLoggerFactory != null) { + mockedLoggerFactory.close(); + } + } +} diff --git a/core/src/test/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessorTest.java b/core/src/test/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessorTest.java new file mode 100644 index 0000000000..6e72abf57d --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessorTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.processor.client; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.seata.core.protocol.AbstractResultMessage; +import org.apache.seata.core.protocol.BatchResultMessage; +import org.apache.seata.core.protocol.MergeMessage; +import org.apache.seata.core.protocol.MergeResultMessage; +import org.apache.seata.core.protocol.MergedWarpMessage; +import org.apache.seata.core.protocol.MessageFuture; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.transaction.GlobalBeginRequest; +import org.apache.seata.core.protocol.transaction.GlobalCommitResponse; +import org.apache.seata.core.rpc.TransactionMessageHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * The type Client on response processor test. + */ +class ClientOnResponseProcessorTest { + + private ChannelHandlerContext mockCtx; + private RpcMessage mockRpcMessage; + private Map<Integer, MergeMessage> mergeMsgMap; + private ConcurrentHashMap<Integer, MessageFuture> futures; + private Map<Integer, Integer> childToParentMap; + private TransactionMessageHandler mockTransactionMessageHandler; + private Logger mockLogger; + + private ClientOnResponseProcessor processor; + + private MockedStatic<LoggerFactory> mockedLogger; + + /** + * Sets up. + */ + @BeforeEach + void setUp() { + mockCtx = mock(ChannelHandlerContext.class); + mockRpcMessage = mock(RpcMessage.class); + mergeMsgMap = new HashMap<>(); + futures = new ConcurrentHashMap<>(); + childToParentMap = new HashMap<>(); + mockTransactionMessageHandler = mock(TransactionMessageHandler.class); + mockLogger = mock(Logger.class); + + // Mock static logger + mockedLogger = Mockito.mockStatic(LoggerFactory.class); + mockedLogger.when(() -> LoggerFactory.getLogger(ClientOnResponseProcessor.class)).thenReturn(mockLogger); + + processor = new ClientOnResponseProcessor(mergeMsgMap, futures, childToParentMap, + mockTransactionMessageHandler); + } + + /** + * Process merge result message. + * + * @throws Exception the exception + */ + @Test + @Disabled + void processMergeResultMessage() throws Exception { + // Setup merge result message + MergeResultMessage mockMergeResult = mock(MergeResultMessage.class); + when(mockRpcMessage.getBody()).thenReturn(mockMergeResult); + GlobalCommitResponse gitCommitResponse = new GlobalCommitResponse(); + mockMergeResult.setMsgs(new AbstractResultMessage[] {gitCommitResponse}); + int rpcId = 123; + when(mockRpcMessage.getId()).thenReturn(rpcId); + + MergedWarpMessage mergedWarp = new MergedWarpMessage(); + GlobalBeginRequest mockRpc = mock(GlobalBeginRequest.class); + mergedWarp.msgs.add(mockRpc); + mergedWarp.msgIds.add(456); + mergeMsgMap.put(rpcId, mergedWarp); + + // Configure future + MessageFuture mockFuture = mock(MessageFuture.class); + futures.put(456, mockFuture); + + // Execute + processor.process(mockCtx, mockRpcMessage); + + // Verify + verify(futures).remove(456); + verify(childToParentMap).remove(456); + verify(mockFuture).setResultMessage(any()); + verify(mockLogger, never()).error(anyString(), any(), any()); + } + + /** + * Process batch result message. + * + * @throws Exception the exception + */ + @Test + void processBatchResultMessage() throws Exception { + // Setup batch result message + BatchResultMessage mockBatchResult = mock(BatchResultMessage.class); + when(mockRpcMessage.getBody()).thenReturn(mockBatchResult); + when(mockBatchResult.getMsgIds()).thenReturn(Collections.singletonList(789)); + when(mockBatchResult.getResultMessages()).thenReturn( + Collections.singletonList(mock(AbstractResultMessage.class))); + + // Configure child-parent mapping + childToParentMap.put(789, 101112); + mergeMsgMap.put(101112, mock(MergeMessage.class)); + + // Configure future + MessageFuture mockFuture = mock(MessageFuture.class); + futures.put(789, mockFuture); + + // Execute + processor.process(mockCtx, mockRpcMessage); + + // Verify + assertFalse(futures.containsKey(789), "Future should be removed from the map"); + assertFalse(childToParentMap.containsKey(789), "Child-parent mapping should be removed"); + assertFalse(mergeMsgMap.containsKey(101112), "Parent message should be removed"); + verify(mockFuture).setResultMessage(any()); + } + + /** + * Process generic result message with future. + * + * @throws Exception the exception + */ + @Test + void processGenericResultMessageWithFuture() throws Exception { + // Setup generic message + AbstractResultMessage mockResult = mock(AbstractResultMessage.class); + when(mockRpcMessage.getBody()).thenReturn(mockResult); + int msgId = 131415; + when(mockRpcMessage.getId()).thenReturn(msgId); + + // Configure future + MessageFuture mockFuture = mock(MessageFuture.class); + futures.put(msgId, mockFuture); + + // Execute + processor.process(mockCtx, mockRpcMessage); + + // Verify + assertFalse(futures.containsKey(msgId), "Future should be removed from the map"); + verify(mockFuture).setResultMessage(mockResult); + verify(mockTransactionMessageHandler, never()).onResponse(any(), any()); + } + + /** + * Process generic result message without future. + * + * @throws Exception the exception + */ + @Test + void processGenericResultMessageWithoutFuture() throws Exception { + // Setup generic message + AbstractResultMessage mockResult = mock(AbstractResultMessage.class); + when(mockRpcMessage.getBody()).thenReturn(mockResult); + int msgId = 161718; + when(mockRpcMessage.getId()).thenReturn(msgId); + + // No future exists + + // Execute + processor.process(mockCtx, mockRpcMessage); + + // Verify + verify(mockTransactionMessageHandler).onResponse(mockResult, null); + verify(mockLogger, never()).error(anyString(), any(Object.class), any(Object.class)); + } + + /** + * Tear down. + */ + @AfterEach + void tearDown() { + mockCtx = null; + mockRpcMessage = null; + mergeMsgMap = null; + futures = null; + childToParentMap = null; + mockTransactionMessageHandler = null; + if (mockedLogger != null) { + mockedLogger.close(); + } + } +} + diff --git a/core/src/test/java/org/apache/seata/core/rpc/processor/client/RmBranchCommitProcessorTest.java b/core/src/test/java/org/apache/seata/core/rpc/processor/client/RmBranchCommitProcessorTest.java new file mode 100644 index 0000000000..303cf411a3 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/processor/client/RmBranchCommitProcessorTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.processor.client; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.seata.common.util.NetUtil; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.transaction.BranchCommitRequest; +import org.apache.seata.core.protocol.transaction.BranchCommitResponse; +import org.apache.seata.core.rpc.RemotingClient; +import org.apache.seata.core.rpc.TransactionMessageHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * The type Rm branch commit processor test. + */ +public class RmBranchCommitProcessorTest { + private ChannelHandlerContext mockCtx; + private RpcMessage mockRpcMessage; + private TransactionMessageHandler mockHandler; + private RemotingClient mockRemotingClient; + private Logger mockLogger; + private MockedStatic<LoggerFactory> mockedLoggerFactory; + private MockedStatic<NetUtil> mockedNetUtil; + private RmBranchCommitProcessor processor; + + /** + * Sets up. + */ + @BeforeEach + void setUp() { + mockCtx = mock(ChannelHandlerContext.class); + mockRpcMessage = mock(RpcMessage.class); + mockHandler = mock(TransactionMessageHandler.class); + mockRemotingClient = mock(RemotingClient.class); + mockLogger = mock(Logger.class); + when(mockLogger.isInfoEnabled()).thenReturn(true); + mockedLoggerFactory = Mockito.mockStatic(LoggerFactory.class); + mockedLoggerFactory.when(() -> LoggerFactory.getLogger(RmBranchCommitProcessor.class)).thenReturn(mockLogger); + + mockedNetUtil = Mockito.mockStatic(NetUtil.class); + + processor = new RmBranchCommitProcessor(mockHandler, mockRemotingClient); + //setField(null, "LOGGER", mockLogger); + } + + /** + * Tear down. + */ + @AfterEach + void tearDown() { + mockedLoggerFactory.close(); + mockedNetUtil.close(); + } + + /** + * Process should handle branch commit and send response. + * + * @throws Exception the exception + */ + @Test + void processShouldHandleBranchCommitAndSendResponse() throws Exception { + InetSocketAddress mockAddress = new InetSocketAddress("127.0.0.1", 8091); + Channel mockChannel = mock(Channel.class); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(mockAddress); + mockedNetUtil.when(() -> NetUtil.toStringAddress(any(SocketAddress.class))).thenReturn("127.0.0.1:8091"); + + BranchCommitRequest mockRequest = mock(BranchCommitRequest.class); + BranchCommitResponse mockResponse = mock(BranchCommitResponse.class); + when(mockRpcMessage.getBody()).thenReturn(mockRequest); + when(mockHandler.onRequest(mockRequest, null)).thenReturn(mockResponse); + when(mockLogger.isInfoEnabled()).thenReturn(true); + + processor.process(mockCtx, mockRpcMessage); + + verify(mockHandler).onRequest(mockRequest, null); + verify(mockRemotingClient).sendAsyncResponse("127.0.0.1:8091", mockRpcMessage, mockResponse); + } + + /** + * Process should log error when send fails. + * + * @throws Exception the exception + */ + @Test + void processShouldLogErrorWhenSendFails() throws Exception { + InetSocketAddress mockAddress = new InetSocketAddress("127.0.0.1", 8091); + Channel mockChannel = mock(Channel.class); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(mockAddress); + mockedNetUtil.when(() -> NetUtil.toStringAddress(any(SocketAddress.class))).thenReturn("127.0.0.1:8091"); + + BranchCommitRequest mockRequest = mock(BranchCommitRequest.class); + BranchCommitResponse mockResponse = mock(BranchCommitResponse.class); + when(mockRpcMessage.getBody()).thenReturn(mockRequest); + when(mockHandler.onRequest(mockRequest, null)).thenReturn(mockResponse); + + Throwable simulatedError = new RuntimeException("Network failure"); + doThrow(simulatedError).when(mockRemotingClient).sendAsyncResponse(anyString(), any(), any()); + + processor.process(mockCtx, mockRpcMessage); + + verify(mockLogger).error(eq("branch commit error: {}"), eq("Network failure"), eq(simulatedError)); + } + + /** + * Process should not log debug when disabled. + * + * @throws Exception the exception + */ + @Test + void processShouldNotLogDebugWhenDisabled() throws Exception { + InetSocketAddress mockAddress = new InetSocketAddress("127.0.0.1", 8091); + Channel mockChannel = mock(Channel.class); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(mockAddress); + mockedNetUtil.when(() -> NetUtil.toStringAddress(any(SocketAddress.class))).thenReturn("127.0.0.1:8091"); + + BranchCommitRequest mockRequest = mock(BranchCommitRequest.class); + BranchCommitResponse mockResponse = mock(BranchCommitResponse.class); + when(mockRpcMessage.getBody()).thenReturn(mockRequest); + when(mockHandler.onRequest(mockRequest, null)).thenReturn(mockResponse); + when(mockLogger.isInfoEnabled()).thenReturn(true); + when(mockLogger.isDebugEnabled()).thenReturn(false); + + processor.process(mockCtx, mockRpcMessage); + + verify(mockLogger, never()).debug(anyString(), (Object[])any()); + } +} diff --git a/core/src/test/java/org/apache/seata/core/rpc/processor/client/RmBranchRollbackProcessorTest.java b/core/src/test/java/org/apache/seata/core/rpc/processor/client/RmBranchRollbackProcessorTest.java new file mode 100644 index 0000000000..21b9469295 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/processor/client/RmBranchRollbackProcessorTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.processor.client; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import org.apache.seata.common.util.NetUtil; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.transaction.BranchRollbackRequest; +import org.apache.seata.core.protocol.transaction.BranchRollbackResponse; +import org.apache.seata.core.rpc.RemotingClient; +import org.apache.seata.core.rpc.TransactionMessageHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * The type Rm branch rollback processor test. + */ +public class RmBranchRollbackProcessorTest { + private ChannelHandlerContext mockCtx; + private RpcMessage mockRpcMessage; + private TransactionMessageHandler mockHandler; + private RemotingClient mockRemotingClient; + private Logger mockLogger; + private MockedStatic<LoggerFactory> mockedLoggerFactory; + private MockedStatic<NetUtil> mockedNetUtil; + private RmBranchRollbackProcessor processor; + + /** + * Sets up. + */ + @BeforeEach + void setUp() { + mockCtx = mock(ChannelHandlerContext.class); + mockRpcMessage = mock(RpcMessage.class); + mockHandler = mock(TransactionMessageHandler.class); + mockRemotingClient = mock(RemotingClient.class); + mockLogger = mock(Logger.class); + + mockedLoggerFactory = Mockito.mockStatic(LoggerFactory.class); + mockedLoggerFactory.when(() -> LoggerFactory.getLogger(RmBranchRollbackProcessor.class)).thenReturn(mockLogger); + mockedNetUtil = Mockito.mockStatic(NetUtil.class); + processor = new RmBranchRollbackProcessor(mockHandler, mockRemotingClient); + } + + /** + * Process should handle branch rollback and send response when request valid. + * + * @throws Exception the exception + */ + @Test + void process_ShouldHandleBranchRollbackAndSendResponse_WhenRequestValid() throws Exception { + // Arrange + InetSocketAddress mockAddress = new InetSocketAddress("127.0.0.1", 8091); + Channel mockChannel = mock(Channel.class); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(mockAddress); + mockedNetUtil.when(() -> NetUtil.toStringAddress(any(SocketAddress.class))).thenReturn("127.0.0.1:8091"); + + BranchRollbackRequest mockRequest = mock(BranchRollbackRequest.class); + BranchRollbackResponse mockResponse = mock(BranchRollbackResponse.class); + when(mockRpcMessage.getBody()).thenReturn(mockRequest); + when(mockHandler.onRequest(mockRequest, null)).thenReturn(mockResponse); + when(mockLogger.isInfoEnabled()).thenReturn(true); + + // Act + processor.process(mockCtx, mockRpcMessage); + + // Assert + verify(mockHandler).onRequest(mockRequest, null); + verify(mockRemotingClient).sendAsyncResponse("127.0.0.1:8091", mockRpcMessage, mockResponse); + } + + /** + * Process should not log when info disabled. + * + * @throws Exception the exception + */ + @Test + void process_ShouldNotLog_WhenInfoDisabled() throws Exception { + // Arrange + InetSocketAddress mockAddress = new InetSocketAddress("127.0.0.1", 8091); + Channel mockChannel = mock(Channel.class); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(mockAddress); + mockedNetUtil.when(() -> NetUtil.toStringAddress(any(SocketAddress.class))).thenReturn("127.0.0.1:8091"); + + BranchRollbackRequest mockRequest = mock(BranchRollbackRequest.class); + BranchRollbackResponse mockResponse = mock(BranchRollbackResponse.class); + when(mockRpcMessage.getBody()).thenReturn(mockRequest); + when(mockHandler.onRequest(mockRequest, null)).thenReturn(mockResponse); + when(mockLogger.isInfoEnabled()).thenReturn(false); + + // Act + processor.process(mockCtx, mockRpcMessage); + + // Assert + verify(mockLogger, never()).info(anyString(), (Object[])any()); + } + + /** + * Process should log error when send response fails. + * + * @throws Exception the exception + */ + @Test + void process_ShouldLogError_WhenSendResponseFails() throws Exception { + // Arrange + InetSocketAddress mockAddress = new InetSocketAddress("127.0.0.1", 8091); + Channel mockChannel = mock(Channel.class); + when(mockCtx.channel()).thenReturn(mockChannel); + when(mockChannel.remoteAddress()).thenReturn(mockAddress); + mockedNetUtil.when(() -> NetUtil.toStringAddress(any(SocketAddress.class))).thenReturn("127.0.0.1:8091"); + + BranchRollbackRequest mockRequest = mock(BranchRollbackRequest.class); + BranchRollbackResponse mockResponse = mock(BranchRollbackResponse.class); + when(mockRpcMessage.getBody()).thenReturn(mockRequest); + when(mockHandler.onRequest(mockRequest, null)).thenReturn(mockResponse); + + Throwable simulatedError = new RuntimeException("Network error"); + doThrow(simulatedError).when(mockRemotingClient).sendAsyncResponse(anyString(), any(), any()); + + // Act + processor.process(mockCtx, mockRpcMessage); + + // Assert + verify(mockLogger).error(eq("send response error: {}"), eq("Network error"), eq(simulatedError)); + } + + /** + * Tear down. + */ + @AfterEach + void tearDown() { + mockedLoggerFactory.close(); + mockedNetUtil.close(); + } + +} diff --git a/core/src/test/java/org/apache/seata/core/rpc/processor/client/RmUndoLogProcessorTest.java b/core/src/test/java/org/apache/seata/core/rpc/processor/client/RmUndoLogProcessorTest.java new file mode 100644 index 0000000000..3a801f2c2f --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/processor/client/RmUndoLogProcessorTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.processor.client; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.transaction.UndoLogDeleteRequest; +import org.apache.seata.core.rpc.TransactionMessageHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * The type Rm undo log processor test. + */ +public class RmUndoLogProcessorTest { + private ChannelHandlerContext mockCtx; + private RpcMessage mockRpcMessage; + private RmUndoLogProcessor processor; + private MockedStatic<LoggerFactory> mockedLoggerFactory; + private TransactionMessageHandler mockHandler; + private Logger mockLogger; + + /** + * Sets up. + */ + @BeforeEach + void setUp() { + mockCtx = mock(ChannelHandlerContext.class); + mockRpcMessage = mock(RpcMessage.class); + mockHandler = mock(TransactionMessageHandler.class); + mockLogger = mock(Logger.class); + + mockedLoggerFactory = Mockito.mockStatic(LoggerFactory.class); + mockedLoggerFactory.when(() -> LoggerFactory.getLogger(RmUndoLogProcessor.class)).thenReturn(mockLogger); + + processor = new RmUndoLogProcessor(mockHandler); + } + + /** + * Process should invoke handler. + * + * @throws Exception the exception + */ + @Test + void process_ShouldInvokeHandler() throws Exception { + // Arrange + UndoLogDeleteRequest mockRequest = mock(UndoLogDeleteRequest.class); + when(mockRpcMessage.getBody()).thenReturn(mockRequest); + + // Act + processor.process(mockCtx, mockRpcMessage); + + // Assert + verify(mockHandler).onRequest(mockRequest, null); + } + + /** + * Tear down. + */ + @AfterEach + void tearDown() { + if (mockedLoggerFactory != null) { + mockedLoggerFactory.close(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org