This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 48413303752 Add ReservationPersistService to reserve worker id for 
ClusterWorkerIdGenerator (#31981)
48413303752 is described below

commit 48413303752bb1f6853e9890fd7524c9993531ba
Author: Haoran Meng <[email protected]>
AuthorDate: Thu Jul 4 17:35:49 2024 +0800

    Add ReservationPersistService to reserve worker id for 
ClusterWorkerIdGenerator (#31981)
---
 .../metadata/persist/node/ReservationNode.java     |  6 +--
 .../listener/MetaDataChangedListener.java          |  4 +-
 .../ComputeNodeStateDispatchEventBuilder.java      |  4 +-
 .../dispatch/CacheEvictedSubscriber.java           |  2 +-
 .../workerid/ClusterWorkerIdGenerator.java         | 17 +++-----
 .../cluster/persist/ReservationPersistService.java | 49 ++++++++++++++++++++++
 ...ationNodeTest.java => ReservationNodeTest.java} |  5 ++-
 .../dispatch/CacheEvictedSubscriberTest.java       |  2 +-
 8 files changed, 67 insertions(+), 22 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNode.java
 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ReservationNode.java
similarity index 89%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNode.java
rename to 
kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ReservationNode.java
index 0cbfa6b6127..44611bdae95 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNode.java
+++ 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ReservationNode.java
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node;
+package org.apache.shardingsphere.metadata.persist.node;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
 /**
- * Worker id reservation node.
+ * Reservation node.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class WorkerIdReservationNode {
+public final class ReservationNode {
     
     private static final String ROOT_NODE = "reservation";
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
index ecdc89b566a..18ac369a014 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
@@ -56,10 +56,10 @@ public final class MetaDataChangedListener implements 
DataChangedEventListener {
     
     @Override
     public void onChange(final DataChangedEvent event) {
-        createGovernanceEvent(event).ifPresent(eventBusContext::post);
+        createDispatchEvent(event).ifPresent(eventBusContext::post);
     }
     
-    private Optional<DispatchEvent> createGovernanceEvent(final 
DataChangedEvent event) {
+    private Optional<DispatchEvent> createDispatchEvent(final DataChangedEvent 
event) {
         String key = event.getKey();
         Optional<String> databaseName = 
DatabaseMetaDataNode.getDatabaseNameBySchemaNode(key);
         Optional<String> schemaName = DatabaseMetaDataNode.getSchemaName(key);
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/builder/ComputeNodeStateDispatchEventBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/builder/ComputeNodeStateDispatchEventBuilder.java
index 4c21fb1a2b4..7c334108657 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/builder/ComputeNodeStateDispatchEventBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/builder/ComputeNodeStateDispatchEventBuilder.java
@@ -58,7 +58,7 @@ public final class ComputeNodeStateDispatchEventBuilder 
implements DispatchEvent
     public Optional<DispatchEvent> build(final DataChangedEvent event) {
         String instanceId = 
ComputeNode.getInstanceIdByComputeNode(event.getKey());
         if (!Strings.isNullOrEmpty(instanceId)) {
-            Optional<DispatchEvent> result = 
createInstanceGovernanceEvent(event, instanceId);
+            Optional<DispatchEvent> result = 
createInstanceDispatchEvent(event, instanceId);
             if (result.isPresent()) {
                 return result;
             }
@@ -73,7 +73,7 @@ public final class ComputeNodeStateDispatchEventBuilder 
implements DispatchEvent
     }
     
     @SuppressWarnings("unchecked")
-    private Optional<DispatchEvent> createInstanceGovernanceEvent(final 
DataChangedEvent event, final String instanceId) {
+    private Optional<DispatchEvent> createInstanceDispatchEvent(final 
DataChangedEvent event, final String instanceId) {
         if 
(event.getKey().equals(ComputeNode.getComputeNodeStateNodePath(instanceId)) && 
Type.DELETED != event.getType()) {
             return Optional.of(new 
ComputeNodeInstanceStateChangedEvent(instanceId, event.getValue()));
         }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriber.java
index 142bb48a91d..a64b6a4d800 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriber.java
@@ -34,7 +34,7 @@ public final class CacheEvictedSubscriber implements 
EventSubscriber {
      * @param ignored unused
      */
     @Subscribe
-    public void onGovernanceEvent(final DispatchEvent ignored) {
+    public void cleanCache(final DispatchEvent ignored) {
         OrderedServicesCache.clearCache();
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/instance/workerid/ClusterWorkerIdGenerator.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/instance/workerid/ClusterWorkerIdGenerator.java
index dd449f523dd..b2d2a11908c 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/instance/workerid/ClusterWorkerIdGenerator.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/instance/workerid/ClusterWorkerIdGenerator.java
@@ -22,10 +22,9 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.instance.workerid.WorkerIdAssignedException;
 import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdReservationNode;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import 
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
+import 
org.apache.shardingsphere.mode.manager.cluster.persist.ReservationPersistService;
 import 
org.apache.shardingsphere.mode.persist.service.ComputeNodePersistService;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 import java.util.Collection;
 import java.util.Optional;
@@ -41,18 +40,18 @@ import java.util.stream.IntStream;
 @Slf4j
 public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
     
-    private final ClusterPersistRepository repository;
-    
     private final String instanceId;
     
     private final ComputeNodePersistService computeNodePersistService;
     
+    private final ReservationPersistService reservationPersistService;
+    
     private final AtomicBoolean isWarned = new AtomicBoolean(false);
     
     public ClusterWorkerIdGenerator(final ClusterPersistRepository repository, 
final String instanceId) {
-        this.repository = repository;
         this.instanceId = instanceId;
         computeNodePersistService = new ComputeNodePersistService(repository);
+        reservationPersistService = new ReservationPersistService(repository);
     }
     
     @Override
@@ -82,11 +81,7 @@ public final class ClusterWorkerIdGenerator implements 
WorkerIdGenerator {
         PriorityQueue<Integer> availableWorkerIds = IntStream.range(0, 
1024).boxed().filter(each -> 
!assignedWorkerIds.contains(each)).collect(Collectors.toCollection(PriorityQueue::new));
         Integer preselectedWorkerId = availableWorkerIds.poll();
         Preconditions.checkNotNull(preselectedWorkerId, "Preselected worker-id 
can not be null.");
-        try {
-            return 
repository.persistExclusiveEphemeral(WorkerIdReservationNode.getWorkerIdReservationPath(preselectedWorkerId),
 instanceId) ? Optional.of(preselectedWorkerId) : Optional.empty();
-        } catch (final ClusterPersistRepositoryException ignore) {
-            return Optional.empty();
-        }
+        return reservationPersistService.reserveWorkerId(preselectedWorkerId, 
instanceId);
     }
     
     private void logWarning(final int generatedWorkerId, final Properties 
props) {
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/ReservationPersistService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/ReservationPersistService.java
new file mode 100644
index 00000000000..90058e37eb1
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/ReservationPersistService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.persist;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.metadata.persist.node.ReservationNode;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
+
+import java.util.Optional;
+
+/**
+ * Reservation persist service.
+ */
+@RequiredArgsConstructor
+public final class ReservationPersistService {
+    
+    private final ClusterPersistRepository repository;
+    
+    /**
+     * Reserve worker ID.
+     *
+     * @param preselectedWorkerId preselected worker ID
+     * @param instanceId instance ID
+     * @return worker ID
+     */
+    public Optional<Integer> reserveWorkerId(final Integer 
preselectedWorkerId, final String instanceId) {
+        try {
+            return 
repository.persistExclusiveEphemeral(ReservationNode.getWorkerIdReservationPath(preselectedWorkerId),
 instanceId) ? Optional.of(preselectedWorkerId) : Optional.empty();
+        } catch (final ClusterPersistRepositoryException ignore) {
+            return Optional.empty();
+        }
+    }
+}
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNodeTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/ReservationNodeTest.java
similarity index 84%
rename from 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNodeTest.java
rename to 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/ReservationNodeTest.java
index 07ec342f810..c569f535065 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNodeTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/ReservationNodeTest.java
@@ -17,15 +17,16 @@
 
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node;
 
+import org.apache.shardingsphere.metadata.persist.node.ReservationNode;
 import org.junit.jupiter.api.Test;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-class WorkerIdReservationNodeTest {
+class ReservationNodeTest {
     
     @Test
     void assertGetWorkerIdReservationPath() {
-        assertThat(WorkerIdReservationNode.getWorkerIdReservationPath(1), 
is("/reservation/worker_id/1"));
+        assertThat(ReservationNode.getWorkerIdReservationPath(1), 
is("/reservation/worker_id/1"));
     }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriberTest.java
index b81437212b3..db0a4c2b025 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriberTest.java
@@ -29,7 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 class CacheEvictedSubscriberTest {
     
     @Test
-    void assertOnGovernanceEvent() {
+    void assertClearCache() {
         EventBusContext eventBusContext = new EventBusContext();
         eventBusContext.register(new CacheEvictedSubscriber());
         OrderedServicesCache.cacheServices(getClass(), 
Collections.emptyList(), Collections.emptyMap());

Reply via email to