This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new 766be773e0 bugfix: Resolve `NullPointerException` in `EtcdRegistryServiceImplMockTest` (#7349) 766be773e0 is described below commit 766be773e00b7db9361e5b411e85367770883841 Author: Yongjun Hong <yongj...@apache.org> AuthorDate: Tue May 20 00:03:09 2025 +0900 bugfix: Resolve `NullPointerException` in `EtcdRegistryServiceImplMockTest` (#7349) --- changes/en-us/2.x.md | 2 +- changes/zh-cn/2.x.md | 3 +- .../registry/etcd3/EtcdRegistryServiceImpl.java | 128 ++++++++++++--------- .../etcd/EtcdRegistryServiceImplMockTest.java | 119 +++++++++++-------- 4 files changed, 145 insertions(+), 107 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index e17a54162d..a66c722a65 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -25,7 +25,7 @@ Add changes here for all PR submitted to the 2.x branch. ### bugfix: -- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] fix XXX +- [[#7349](https://github.com/apache/incubator-seata/pull/7349)] Resolve NullPointerException in EtcdRegistryServiceImplMockTest diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 98887b0bbe..9f43a73ff0 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -25,8 +25,7 @@ ### bugfix: -- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] fix XXX - +- [[#7349](https://github.com/apache/incubator-seata/pull/7349)] 解决 EtcdRegistryServiceImplMockTest 中的空指针异常 ### optimize: diff --git a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java index 96d3cf0511..4c43212d69 100644 --- a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java +++ b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java @@ -16,6 +16,7 @@ */ package org.apache.seata.discovery.registry.etcd3; +import static io.netty.util.CharsetUtil.UTF_8; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; @@ -28,24 +29,11 @@ import io.etcd.jetcd.options.LeaseOption; import io.etcd.jetcd.options.PutOption; import io.etcd.jetcd.options.WatchOption; import io.etcd.jetcd.watch.WatchResponse; -import org.apache.seata.common.exception.ShouldNeverHappenException; -import org.apache.seata.common.thread.NamedThreadFactory; -import org.apache.seata.common.util.NetUtil; -import org.apache.seata.common.util.StringUtils; -import org.apache.seata.config.Configuration; -import org.apache.seata.config.ConfigurationFactory; -import org.apache.seata.config.exception.ConfigNotFoundException; -import org.apache.seata.discovery.registry.RegistryHeartBeats; -import org.apache.seata.discovery.registry.RegistryService; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.net.InetSocketAddress; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Collections; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -56,9 +44,17 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - -import static io.netty.util.CharsetUtil.UTF_8; - +import org.apache.seata.common.exception.ShouldNeverHappenException; +import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.common.util.NetUtil; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.config.exception.ConfigNotFoundException; +import org.apache.seata.discovery.registry.RegistryHeartBeats; +import org.apache.seata.discovery.registry.RegistryService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> { @@ -71,7 +67,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> private static final String REGISTRY_CLUSTER = "cluster"; private static final String DEFAULT_CLUSTER_NAME = "default"; private static final String REGISTRY_KEY_PREFIX = "registry-seata-"; - private static final String FILE_CONFIG_KEY_PREFIX = FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + REGISTRY_TYPE + FILE_CONFIG_SPLIT_CHAR; + private static final String FILE_CONFIG_KEY_PREFIX = + FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + REGISTRY_TYPE + FILE_CONFIG_SPLIT_CHAR; private static final int MAP_INITIAL_CAPACITY = 8; private static final int THREAD_POOL_SIZE = 2; private ExecutorService executorService; @@ -84,11 +81,12 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> /** * interval for life keep */ - private final static long LIFE_KEEP_INTERVAL = 5; + private static final long LIFE_KEEP_INTERVAL = 5; /** * critical value for life keep */ - private final static long LIFE_KEEP_CRITICAL = 6; + private static final long LIFE_KEEP_CRITICAL = 6; + private static volatile EtcdRegistryServiceImpl instance; private static volatile Client client; private ConcurrentMap<String, Pair<Long /*revision*/, List<InetSocketAddress>>> clusterAddressMap; @@ -102,12 +100,17 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> */ public static final String TEST_ENDPONT = "etcd-test-lancher-endpoint"; - private EtcdRegistryServiceImpl() { clusterAddressMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY); listenerMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY); watcherMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY); - executorService = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("registry-etcd3", THREAD_POOL_SIZE)); + executorService = new ThreadPoolExecutor( + THREAD_POOL_SIZE, + THREAD_POOL_SIZE, + Integer.MAX_VALUE, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("registry-etcd3", THREAD_POOL_SIZE)); } /** @@ -126,7 +129,6 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> return instance; } - @Override public void register(InetSocketAddress address) throws Exception { NetUtil.validAddress(address); @@ -141,10 +143,12 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> */ private void doRegister(InetSocketAddress address) throws Exception { PutOption putOption = PutOption.newBuilder().withLeaseId(getLeaseId()).build(); - getClient().getKVClient().put(buildRegistryKey(address), buildRegistryValue(address), putOption).get(); + getClient() + .getKVClient() + .put(buildRegistryKey(address), buildRegistryValue(address), putOption) + .get(); } - @Override public void unregister(InetSocketAddress address) throws Exception { NetUtil.validAddress(address); @@ -163,8 +167,7 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> @Override public void subscribe(String cluster, Watch.Listener listener) throws Exception { - listenerMap.computeIfAbsent(cluster, key -> new HashSet<>()) - .add(listener); + listenerMap.computeIfAbsent(cluster, key -> new HashSet<>()).add(listener); EtcdWatcher watcher = watcherMap.computeIfAbsent(cluster, w -> new EtcdWatcher(cluster, listener)); executorService.submit(watcher); } @@ -178,6 +181,7 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> .collect(Collectors.toSet()); listenerMap.put(cluster, newSubscribeSet); } + watcherMap.remove(cluster).stop(); } @@ -194,9 +198,9 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> private List<InetSocketAddress> lookupByCluster(String cluster) throws Exception { if (!listenerMap.containsKey(cluster)) { - //1.refresh + // 1.refresh refreshCluster(cluster); - //2.subscribe + // 2.subscribe subscribe(cluster, new Watch.Listener() { @Override public void onNext(WatchResponse response) { @@ -209,16 +213,11 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> } @Override - public void onError(Throwable throwable) { - - } + public void onError(Throwable throwable) {} @Override - public void onCompleted() { - - } + public void onCompleted() {} }); - } Pair<Long, List<InetSocketAddress>> pair = clusterAddressMap.get(cluster); return Objects.isNull(pair) ? Collections.emptyList() : pair.getValue(); @@ -232,7 +231,6 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> lifeKeeperFuture.get(3, TimeUnit.SECONDS); } } - } /** @@ -245,14 +243,22 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> if (cluster == null) { return; } - //1.get all available registries - GetOption getOption = GetOption.newBuilder().withPrefix(buildRegistryKeyPrefix(cluster)).build(); - GetResponse getResponse = getClient().getKVClient().get(buildRegistryKeyPrefix(cluster), getOption).get(); - //2.add to list - List<InetSocketAddress> instanceList = getResponse.getKvs().stream().map(keyValue -> { - String[] instanceInfo = NetUtil.splitIPPortStr(keyValue.getValue().toString(UTF_8)); - return new InetSocketAddress(instanceInfo[0], Integer.parseInt(instanceInfo[1])); - }).collect(Collectors.toList()); + // 1.get all available registries + GetOption getOption = GetOption.newBuilder() + .withPrefix(buildRegistryKeyPrefix(cluster)) + .build(); + GetResponse getResponse = getClient() + .getKVClient() + .get(buildRegistryKeyPrefix(cluster), getOption) + .get(); + // 2.add to list + List<InetSocketAddress> instanceList = getResponse.getKvs().stream() + .map(keyValue -> { + String[] instanceInfo = + NetUtil.splitIPPortStr(keyValue.getValue().toString(UTF_8)); + return new InetSocketAddress(instanceInfo[0], Integer.parseInt(instanceInfo[1])); + }) + .collect(Collectors.toList()); clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList)); removeOfflineAddressesIfNecessary(transactionServiceGroup, cluster, instanceList); @@ -271,7 +277,9 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> if (StringUtils.isNotBlank(testEndpoint)) { client = Client.builder().endpoints(testEndpoint).build(); } else { - client = Client.builder().endpoints(FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY)).build(); + client = Client.builder() + .endpoints(FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY)) + .build(); } } } @@ -285,7 +293,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> * @return */ private String getClusterName() { - String clusterConfigName = String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER); + String clusterConfigName = + String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER); return FILE_CONFIG.getConfig(clusterConfigName, DEFAULT_CLUSTER_NAME); } @@ -294,7 +303,7 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> */ private long getLeaseId() throws Exception { if (0 == leaseId) { - //create a new lease + // create a new lease leaseId = getClient().getLeaseClient().grant(TTL).get().getID(); lifeKeeper = new EtcdLifeKeeper(leaseId); lifeKeeperFuture = executorService.submit(lifeKeeper); @@ -308,7 +317,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> * @return registry key */ private ByteSequence buildRegistryKey(InetSocketAddress address) { - return ByteSequence.from(REGISTRY_KEY_PREFIX + getClusterName() + "-" + NetUtil.toStringAddress(address), UTF_8); + return ByteSequence.from( + REGISTRY_KEY_PREFIX + getClusterName() + "-" + NetUtil.toStringAddress(address), UTF_8); } /** @@ -339,12 +349,10 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> private final Lease leaseClient; private boolean running; - public EtcdLifeKeeper(long leaseId) { this.leaseClient = getClient().getLeaseClient(); this.leaseId = leaseId; this.running = true; - } /** @@ -353,11 +361,13 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> private void process() { for (; ; ) { try { - //1.get TTL - LeaseTimeToLiveResponse leaseTimeToLiveResponse = this.leaseClient.timeToLive(this.leaseId, LeaseOption.DEFAULT).get(); + // 1.get TTL + LeaseTimeToLiveResponse leaseTimeToLiveResponse = this.leaseClient + .timeToLive(this.leaseId, LeaseOption.DEFAULT) + .get(); final long tTl = leaseTimeToLiveResponse.getTTl(); if (tTl <= LIFE_KEEP_CRITICAL) { - //2.refresh the TTL + // 2.refresh the TTL this.leaseClient.keepAliveOnce(this.leaseId).get(); } TimeUnit.SECONDS.sleep(LIFE_KEEP_INTERVAL); @@ -401,20 +411,24 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> @Override public void run() { Watch watchClient = getClient().getWatchClient(); - WatchOption.Builder watchOptionBuilder = WatchOption.newBuilder().withPrefix(buildRegistryKeyPrefix(cluster)); + WatchOption.Builder watchOptionBuilder = + WatchOption.newBuilder().withPrefix(buildRegistryKeyPrefix(cluster)); Pair<Long /*revision*/, List<InetSocketAddress>> addressPair = clusterAddressMap.get(cluster); if (Objects.nonNull(addressPair)) { // Maybe addressPair isn't newest now, but it's ok watchOptionBuilder.withRevision(addressPair.getKey()); } - this.watcher = watchClient.watch(buildRegistryKeyPrefix(cluster), watchOptionBuilder.build(), this.listener); + this.watcher = + watchClient.watch(buildRegistryKeyPrefix(cluster), watchOptionBuilder.build(), this.listener); } /** * stop this task */ public void stop() { - this.watcher.close(); + if (this.watcher != null) { + this.watcher.close(); + } } } diff --git a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java index f70de7609f..b3c1dcc585 100644 --- a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java +++ b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java @@ -16,101 +16,105 @@ */ package org.apache.seata.discovery.registry.etcd; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.seata.common.DefaultValues.DEFAULT_TX_GROUP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.KV; +import io.etcd.jetcd.KeyValue; import io.etcd.jetcd.Lease; import io.etcd.jetcd.Watch; -import io.etcd.jetcd.ByteSequence; -import io.etcd.jetcd.KeyValue; import io.etcd.jetcd.api.RangeResponse; import io.etcd.jetcd.api.ResponseHeader; import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.lease.LeaseGrantResponse; import io.etcd.jetcd.lease.LeaseKeepAliveResponse; import io.etcd.jetcd.lease.LeaseTimeToLiveResponse; - import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.PutOption; import io.etcd.jetcd.options.WatchOption; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.seata.config.Configuration; import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.config.exception.ConfigNotFoundException; import org.apache.seata.discovery.registry.etcd3.EtcdRegistryProvider; import org.apache.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl; -import org.junit.jupiter.api.MethodOrderer; -import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.mockito.InjectMocks; -import org.mockito.Mock; - -import java.lang.reflect.Field; -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.seata.common.DefaultValues.DEFAULT_TX_GROUP; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.verify; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.times; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class EtcdRegistryServiceImplMockTest { - @InjectMocks - private EtcdRegistryServiceImpl registryService; @Mock private Client mockClient; + @Mock private KV mockKVClient; + @Mock private Lease mockLeaseClient; + @Mock private Watch mockWatchClient; + @Mock private Watch.Watcher mockWatcher; + @Mock Configuration configuration; - ExecutorService executorService; + private EtcdRegistryServiceImpl registryService; + private ExecutorService executorService; - private final static String HOST = "127.0.0.1"; - private final static int PORT = 8091; + private static final String HOST = "127.0.0.1"; + private static final int PORT = 8091; private static final String CLUSTER_NAME = "default"; @BeforeEach public void setUp() throws NoSuchFieldException, IllegalAccessException { MockitoAnnotations.openMocks(this); registryService = (EtcdRegistryServiceImpl) spy(new EtcdRegistryProvider().provide()); + // mock client when(mockClient.getLeaseClient()).thenReturn(mockLeaseClient); when(mockClient.getWatchClient()).thenReturn(mockWatchClient); when(mockClient.getKVClient()).thenReturn(mockKVClient); + // inject spy executorService Field executorServiceField = EtcdRegistryServiceImpl.class.getDeclaredField("executorService"); executorServiceField.setAccessible(true); executorService = spy((ExecutorService) executorServiceField.get(registryService)); executorServiceField.set(registryService, executorService); + // inject mock client Field clientField = EtcdRegistryServiceImpl.class.getDeclaredField("client"); clientField.setAccessible(true); @@ -132,12 +136,15 @@ public class EtcdRegistryServiceImplMockTest { public void testRegister() throws Exception { long leaseId = 1L; InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8091); + // Mock lease grant response LeaseGrantResponse leaseGrantResponse = mock(LeaseGrantResponse.class); when(leaseGrantResponse.getID()).thenReturn(leaseId); when(mockLeaseClient.grant(anyLong())).thenReturn(CompletableFuture.completedFuture(leaseGrantResponse)); + // Mock put response when(mockKVClient.put(any(), any(), any(PutOption.class))).thenReturn(CompletableFuture.completedFuture(null)); + // timeToLive response io.etcd.jetcd.api.LeaseTimeToLiveResponse timeToLiveResponseApi = io.etcd.jetcd.api.LeaseTimeToLiveResponse.newBuilder() @@ -146,14 +153,19 @@ public class EtcdRegistryServiceImplMockTest { .build(); when(mockLeaseClient.timeToLive(eq(leaseId), any())) .thenReturn(CompletableFuture.completedFuture(new LeaseTimeToLiveResponse(timeToLiveResponseApi))); + // keepAlive response - io.etcd.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse = io.etcd.jetcd.api.LeaseKeepAliveResponse.newBuilder().build(); + io.etcd.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse = + io.etcd.jetcd.api.LeaseKeepAliveResponse.newBuilder().build(); when(mockLeaseClient.keepAliveOnce(eq(leaseId))) .thenReturn(CompletableFuture.completedFuture(new LeaseKeepAliveResponse(leaseKeepAliveResponse))); + // Act registryService.register(address); + // verify the method to register the new service is called verify(mockKVClient, times(1)).put(any(), any(), any(PutOption.class)); + // verify lifeKeeper task is submitted verify(executorService, times(1)).submit(any(Callable.class)); } @@ -161,10 +173,13 @@ public class EtcdRegistryServiceImplMockTest { @Test public void testUnregister() throws Exception { InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8091); + // Mock delete response when(mockKVClient.delete(any())).thenReturn(CompletableFuture.completedFuture(null)); + // Act registryService.unregister(address); + // Verify verify(mockKVClient, times(1)).delete(any()); } @@ -174,16 +189,19 @@ public class EtcdRegistryServiceImplMockTest { public void testLookup() throws Exception { List<String> services = Arrays.asList("127.0.0.1:8091", "127.0.0.1:8092", "127.0.0.1:8093"); GetResponse mockGetResponse = createMockGetResponse(services); - when(mockKVClient.get(any(ByteSequence.class), any(GetOption.class))).thenReturn(CompletableFuture.completedFuture(mockGetResponse)); + when(mockKVClient.get(any(ByteSequence.class), any(GetOption.class))) + .thenReturn(CompletableFuture.completedFuture(mockGetResponse)); try (MockedStatic<ConfigurationFactory> mockConfig = Mockito.mockStatic(ConfigurationFactory.class)) { // 1. run success case mockConfig.when(ConfigurationFactory::getInstance).thenReturn(configuration); - when(configuration.getConfig("service.vgroupMapping.default_tx_group")).thenReturn(CLUSTER_NAME); + when(configuration.getConfig("service.vgroupMapping.default_tx_group")) + .thenReturn(CLUSTER_NAME); List<InetSocketAddress> lookup = registryService.lookup(DEFAULT_TX_GROUP); List<String> lookupServices = lookup.stream() .map(address -> address.getHostString() + ":" + address.getPort()) .collect(Collectors.toList()); + // assert assertEquals(lookupServices, services); @@ -197,7 +215,9 @@ public class EtcdRegistryServiceImplMockTest { private GetResponse createMockGetResponse(List<String> addresses) { // Create mock ResponseHeader - ResponseHeader mockHeader = ResponseHeader.newBuilder().setRevision(12345L).build(); + ResponseHeader mockHeader = + ResponseHeader.newBuilder().setRevision(12345L).build(); + // Create mock KeyValue list List<KeyValue> mockKeyValues = addresses.stream() .map(address -> { @@ -206,20 +226,23 @@ public class EtcdRegistryServiceImplMockTest { return mockKeyValue; }) .collect(Collectors.toList()); + // Create mock RangeResponse - RangeResponse mockRangeResponse = RangeResponse.newBuilder().setHeader(mockHeader).build(); + RangeResponse mockRangeResponse = + RangeResponse.newBuilder().setHeader(mockHeader).build(); + // Create mock GetResponse GetResponse mockGetResponse = spy(new GetResponse(mockRangeResponse, ByteSequence.EMPTY)); when(mockGetResponse.getKvs()).thenReturn(mockKeyValues); return mockGetResponse; } - @Test public void testSubscribe() throws Exception { Watch.Listener mockListener = mock(Watch.Listener.class); registryService.subscribe(CLUSTER_NAME, mockListener); - //verify watcher task is submitted + + // verify watcher task is submitted verify(executorService, times(1)).submit(any(Runnable.class)); } @@ -227,15 +250,17 @@ public class EtcdRegistryServiceImplMockTest { public void testUnsubscribe() throws Exception { Watch.Listener mockListener = mock(Watch.Listener.class); CountDownLatch latch = new CountDownLatch(1); + when(mockWatchClient.watch(any(), any(WatchOption.class), any(Watch.Listener.class))) .thenAnswer(invocation -> { latch.countDown(); return mockWatcher; }); + registryService.subscribe(DEFAULT_TX_GROUP, mockListener); latch.await(1, TimeUnit.SECONDS); + registryService.unsubscribe(DEFAULT_TX_GROUP, mockListener); + assertEquals(0, latch.getCount(), "Latch should be 0"); } - - } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org