This is an automated email from the ASF dual-hosted git repository.
linghengqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 39212eef673 Refactor ClusterWorkerIdGenerator (#31319)
39212eef673 is described below
commit 39212eef673ae82e9525a50a10c17af7fcbab9b2
Author: Liang Zhang <[email protected]>
AuthorDate: Tue May 21 01:43:45 2024 +0800
Refactor ClusterWorkerIdGenerator (#31319)
* Refactor ClusterWorkerIdGenerator
* Refactor ClusterWorkerIdGenerator
* Refactor ClusterWorkerIdGenerator
* Refactor ClusterWorkerIdGenerator
* Refactor ClusterWorkerIdGenerator
---
.../infra/instance/workerid/WorkerIdGenerator.java | 2 +-
.../workerid/generator/ClusterWorkerIdGenerator.java | 18 ++++++------------
.../registry/workerid/node/WorkerIdNode.java | 4 ++--
.../registry/workerid/node/WorkerIdNodeTest.java | 2 +-
.../repository/cluster/ClusterPersistRepository.java | 2 +-
.../generator/StandaloneWorkerIdGeneratorTest.java | 3 ++-
6 files changed, 13 insertions(+), 18 deletions(-)
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
index 03c25011019..65d72325906 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
@@ -31,7 +31,7 @@ public interface WorkerIdGenerator {
/**
* Generate worker ID.
*
- * @param props props
+ * @param props properties
* @return worker ID
*/
int generate(Properties props);
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index 26dbd5d86be..9d803b5de4c 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -28,11 +28,12 @@ import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
import
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Worker ID generator for cluster mode.
@@ -78,18 +79,11 @@ public final class ClusterWorkerIdGenerator implements
WorkerIdGenerator {
private Optional<Integer> generateAvailableWorkerId() {
Collection<Integer> assignedWorkerIds =
computeNodeStatusService.getAssignedWorkerIds();
ShardingSpherePreconditions.checkState(assignedWorkerIds.size() <=
MAX_WORKER_ID + 1, WorkerIdAssignedException::new);
- Collection<Integer> availableWorkerIds = new LinkedList<>();
- for (int i = 0; i < 1024; i++) {
- availableWorkerIds.add(i);
- }
- PriorityQueue<Integer> priorityQueue = new
PriorityQueue<>(availableWorkerIds);
- for (Integer each : assignedWorkerIds) {
- priorityQueue.remove(each);
- }
- Integer preselectedWorkerId = priorityQueue.poll();
- Preconditions.checkState(null != preselectedWorkerId, "Preselected
worker-id can not be null.");
+ PriorityQueue<Integer> availableWorkerIds = IntStream.range(0,
1024).boxed().filter(each ->
!assignedWorkerIds.contains(each)).collect(Collectors.toCollection(PriorityQueue::new));
+ Integer preselectedWorkerId = availableWorkerIds.poll();
+ Preconditions.checkNotNull(preselectedWorkerId, "Preselected worker-id
can not be null.");
try {
-
repository.persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()),
instanceId);
+
repository.persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId),
instanceId);
return Optional.of(preselectedWorkerId);
} catch (final ClusterPersistRepositoryException ignore) {
return Optional.empty();
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdNode.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdNode.java
index aa35e35a075..00264b9dc70 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdNode.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdNode.java
@@ -34,7 +34,7 @@ public final class WorkerIdNode {
* @param workerId worker id
* @return worker id generator path
*/
- public static String getWorkerIdGeneratorPath(final String workerId) {
- return String.join("/", "", ROOT_NODE, workerId);
+ public static String getWorkerIdGeneratorPath(final int workerId) {
+ return String.join("/", "", ROOT_NODE, String.valueOf(workerId));
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdNodeTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdNodeTest.java
index 8eb7cfb6d3e..313a9dcc297 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdNodeTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdNodeTest.java
@@ -26,6 +26,6 @@ class WorkerIdNodeTest {
@Test
void assertGetWorkerIdGeneratorPath() {
- assertThat(WorkerIdNode.getWorkerIdGeneratorPath("1"),
is("/worker_id/1"));
+ assertThat(WorkerIdNode.getWorkerIdGeneratorPath(1),
is("/worker_id/1"));
}
}
diff --git
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index cb921c52448..77cebe72341 100644
---
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -45,7 +45,7 @@ public interface ClusterPersistRepository extends
PersistRepository {
* Persist exclusive ephemeral data.
*
* @param key key of data
- * @param value is persisted or not
+ * @param value value of data
*/
void persistExclusiveEphemeral(String key, String value);
diff --git
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGeneratorTest.java
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGeneratorTest.java
index 625e993d403..40d112778e9 100644
---
a/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGeneratorTest.java
+++
b/mode/type/standalone/core/src/test/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGeneratorTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.manager.standalone.workerid.generator;
+import
org.apache.shardingsphere.infra.instance.workerid.WorkerIdAssignedException;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
@@ -42,6 +43,6 @@ class StandaloneWorkerIdGeneratorTest {
@Test
void assertGenerateWithInvalidProperties() {
- assertThrows(IllegalStateException.class, () -> new
StandaloneWorkerIdGenerator().generate(PropertiesBuilder.build(new
Property(WorkerIdGenerator.WORKER_ID_KEY, "1024"))));
+ assertThrows(WorkerIdAssignedException.class, () -> new
StandaloneWorkerIdGenerator().generate(PropertiesBuilder.build(new
Property(WorkerIdGenerator.WORKER_ID_KEY, "1024"))));
}
}