This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 08d1219f9e3 Add test cases on ClusterWorkerIdGenerator (#32861)
08d1219f9e3 is described below
commit 08d1219f9e38066121e1751945b629a37753dae1
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Sep 14 15:01:52 2024 +0800
Add test cases on ClusterWorkerIdGenerator (#32861)
* Add test cases on ClusterWorkerIdGenerator
* Add test cases on ClusterWorkerIdGenerator
---
.../cluster/workerid/ClusterWorkerIdGenerator.java | 9 +--
.../workerid/ClusterWorkerIdGeneratorTest.java | 75 +++++++++++++++++++---
.../core/src/test/resources/logback-test.xml | 1 +
3 files changed, 71 insertions(+), 14 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/workerid/ClusterWorkerIdGenerator.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/workerid/ClusterWorkerIdGenerator.java
index 0e8231e54f5..77cd8c8b5f4 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/workerid/ClusterWorkerIdGenerator.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/workerid/ClusterWorkerIdGenerator.java
@@ -56,15 +56,11 @@ public final class ClusterWorkerIdGenerator implements
WorkerIdGenerator {
@Override
public int generate(final Properties props) {
- int result =
loadExistedWorkerId().orElseGet(this::generateNewWorkerId);
+ int result =
computeNodePersistService.loadInstanceWorkerId(instanceId).orElseGet(this::generateNewWorkerId);
logWarning(result, props);
return result;
}
- private Optional<Integer> loadExistedWorkerId() {
- return computeNodePersistService.loadInstanceWorkerId(instanceId);
- }
-
private int generateNewWorkerId() {
Optional<Integer> generatedWorkId;
do {
@@ -78,7 +74,8 @@ public final class ClusterWorkerIdGenerator implements
WorkerIdGenerator {
private Optional<Integer> generateAvailableWorkerId() {
Collection<Integer> assignedWorkerIds =
computeNodePersistService.getAssignedWorkerIds();
ShardingSpherePreconditions.checkState(assignedWorkerIds.size() <=
MAX_WORKER_ID + 1, WorkerIdAssignedException::new);
- PriorityQueue<Integer> availableWorkerIds = IntStream.range(0,
1024).boxed().filter(each ->
!assignedWorkerIds.contains(each)).collect(Collectors.toCollection(PriorityQueue::new));
+ PriorityQueue<Integer> availableWorkerIds = IntStream.range(0,
MAX_WORKER_ID + 1)
+ .boxed().filter(each ->
!assignedWorkerIds.contains(each)).collect(Collectors.toCollection(PriorityQueue::new));
Integer preselectedWorkerId = availableWorkerIds.poll();
Preconditions.checkNotNull(preselectedWorkerId, "Preselected worker-id
can not be null.");
return reservationPersistService.reserveWorkerId(preselectedWorkerId,
instanceId);
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/workerid/ClusterWorkerIdGeneratorTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/workerid/ClusterWorkerIdGeneratorTest.java
index 1ae951c51b4..7e60a27ad16 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/workerid/ClusterWorkerIdGeneratorTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/workerid/ClusterWorkerIdGeneratorTest.java
@@ -17,34 +17,93 @@
package org.apache.shardingsphere.mode.manager.cluster.workerid;
+import lombok.SneakyThrows;
+import
org.apache.shardingsphere.infra.instance.workerid.WorkerIdAssignedException;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
+import
org.apache.shardingsphere.mode.manager.cluster.persist.ReservationPersistService;
+import
org.apache.shardingsphere.mode.persist.service.ComputeNodePersistService;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.stubbing.Answer;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.doAnswer;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+@ExtendWith(MockitoExtension.class)
class ClusterWorkerIdGeneratorTest {
+ private ClusterWorkerIdGenerator workerIdGenerator;
+
+ @Mock
+ private ComputeNodePersistService computeNodePersistService;
+
+ @Mock
+ private ReservationPersistService reservationPersistService;
+
+ @BeforeEach
+ void setUp() {
+ workerIdGenerator = new
ClusterWorkerIdGenerator(mock(ClusterPersistRepository.class), "foo_id");
+ setComputeNodePersistService();
+ setReservationPersistService();
+ }
+
@Test
void assertGenerateWithExistedWorkerId() {
- ClusterPersistRepository repository =
mock(ClusterPersistRepository.class);
-
when(repository.query("/nodes/compute_nodes/worker_id/foo_id")).thenReturn("10");
- assertThat(new ClusterWorkerIdGenerator(repository,
"foo_id").generate(PropertiesBuilder.build(new
Property(WorkerIdGenerator.WORKER_ID_KEY, "1"))), is(10));
+
when(computeNodePersistService.loadInstanceWorkerId("foo_id")).thenReturn(Optional.of(10));
+ assertThat(workerIdGenerator.generate(new Properties()), is(10));
}
+ @SuppressWarnings("unchecked")
@Test
void assertGenerateWithoutExistedWorkerId() {
- ClusterPersistRepository repository =
mock(ClusterPersistRepository.class);
- doAnswer((Answer<Object>) invocation ->
Boolean.TRUE).when(repository).persistExclusiveEphemeral("/reservation/worker_id/0",
"foo_id");
- assertThat(new ClusterWorkerIdGenerator(repository,
"foo_id").generate(new Properties()), is(0));
+
when(computeNodePersistService.getAssignedWorkerIds()).thenReturn(Collections.singleton(0));
+ when(reservationPersistService.reserveWorkerId(1,
"foo_id")).thenReturn(Optional.empty(), Optional.of(1));
+ assertThat(workerIdGenerator.generate(new Properties()), is(1));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertGenerateWithoutExistedWorkerIdFailed() {
+ Collection<Integer> mockedAssignedWorkerIds = mock(Collection.class);
+ when(mockedAssignedWorkerIds.size()).thenReturn(Integer.MAX_VALUE);
+
when(computeNodePersistService.getAssignedWorkerIds()).thenReturn(mockedAssignedWorkerIds);
+ assertThrows(WorkerIdAssignedException.class, () ->
workerIdGenerator.generate(new Properties()));
+ }
+
+ @Test
+ void assertGenerateWorkerIdWithWarnLog() {
+
when(computeNodePersistService.loadInstanceWorkerId("foo_id")).thenReturn(Optional.of(10));
+ assertThat(workerIdGenerator.generate(PropertiesBuilder.build(new
Property(WorkerIdGenerator.WORKER_ID_KEY, "100"))), is(10));
+ assertThat(workerIdGenerator.generate(PropertiesBuilder.build(new
Property(WorkerIdGenerator.WORKER_ID_KEY, "100"))), is(10));
+ }
+
+ private void setComputeNodePersistService() {
+ setField("computeNodePersistService", computeNodePersistService);
+ }
+
+ private void setReservationPersistService() {
+ setField("reservationPersistService", reservationPersistService);
+ }
+
+ @SneakyThrows(ReflectiveOperationException.class)
+ private void setField(final String fieldName, final Object fieldValue) {
+ Field field = workerIdGenerator.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(workerIdGenerator, fieldValue);
+ field.setAccessible(false);
}
}
diff --git a/mode/type/cluster/core/src/test/resources/logback-test.xml
b/mode/type/cluster/core/src/test/resources/logback-test.xml
index 3d2bfedbd4f..5c547836335 100644
--- a/mode/type/cluster/core/src/test/resources/logback-test.xml
+++ b/mode/type/cluster/core/src/test/resources/logback-test.xml
@@ -28,6 +28,7 @@
</logger>
<logger name="org.apache.shardingsphere.schedule.core.ModeScheduleContext"
level="error" />
<logger
name="org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService"
level="off" />
+ <logger
name="org.apache.shardingsphere.mode.manager.cluster.workerid.ClusterWorkerIdGenerator"
level="off" />
<root>
<level value="error" />