This is an automated email from the ASF dual-hosted git repository.

zichaowang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c742764f202 Refactor ClusterWorkerIdGenerator (#31255)
c742764f202 is described below

commit c742764f2023ca3ba0d09f36931437efa8891106
Author: Liang Zhang <[email protected]>
AuthorDate: Fri May 17 03:17:25 2024 +0800

    Refactor ClusterWorkerIdGenerator (#31255)
---
 .../infra/instance/workerid/WorkerIdGenerator.java |  2 +-
 .../cluster/ClusterContextManagerBuilder.java      | 11 ++---
 .../cluster/coordinator/RegistryCenter.java        |  5 +--
 .../subscriber/ProcessListChangedSubscriber.java   | 10 ++---
 .../compute/service/ComputeNodeStatusService.java  |  7 ++--
 .../generator/ClusterWorkerIdGenerator.java        | 48 +++++++++++++---------
 .../subscriber/ClusterEventSubscriberRegistry.java |  4 +-
 .../subscriber/StateChangedSubscriber.java         | 14 ++++---
 .../ProcessListChangedSubscriberTest.java          |  9 ++--
 .../generator/ClusterWorkerIdGeneratorTest.java    | 16 ++------
 .../subscriber/StateChangedSubscriberTest.java     |  3 +-
 .../generator/StandaloneWorkerIdGenerator.java     |  2 +-
 12 files changed, 64 insertions(+), 67 deletions(-)

diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
index f0f9b676e29..bc6681cced5 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.infra.instance.workerid;
 import java.util.Properties;
 
 /**
- * Worker id generator.
+ * Worker ID generator.
  */
 public interface WorkerIdGenerator {
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 2f6f3c59ac8..7e6daee7e26 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -36,6 +36,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metad
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service.ClusterStatusService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber.ClusterStatusSubscriber;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
@@ -62,7 +63,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     public ContextManager build(final ContextManagerBuilderParameter param, 
final EventBusContext eventBusContext) throws SQLException {
         ClusterPersistRepository repository = 
getClusterPersistRepository((ClusterPersistRepositoryConfiguration) 
param.getModeConfiguration().getRepository());
         RegistryCenter registryCenter = new RegistryCenter(eventBusContext, 
repository, param.getInstanceMetaData(), param.getDatabaseConfigs());
-        InstanceContext instanceContext = buildInstanceContext(registryCenter, 
param, eventBusContext);
+        InstanceContext instanceContext = buildInstanceContext(repository, 
param, eventBusContext);
         if (registryCenter.getRepository() instanceof InstanceContextAware) {
             ((InstanceContextAware) 
registryCenter.getRepository()).setInstanceContext(instanceContext);
         }
@@ -83,9 +84,9 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         return result;
     }
     
-    private InstanceContext buildInstanceContext(final RegistryCenter 
registryCenter, final ContextManagerBuilderParameter param, final 
EventBusContext eventBusContext) {
-        return new InstanceContext(new 
ComputeNodeInstance(param.getInstanceMetaData()), new 
ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()), 
param.getModeConfiguration(),
-                new ClusterModeContextManager(), new GlobalLockContext(new 
GlobalLockPersistService(initDistributedLockHolder(registryCenter.getRepository()))),
 eventBusContext);
+    private InstanceContext buildInstanceContext(final 
ClusterPersistRepository repository, final ContextManagerBuilderParameter 
param, final EventBusContext eventBusContext) {
+        return new InstanceContext(new 
ComputeNodeInstance(param.getInstanceMetaData()), new 
ClusterWorkerIdGenerator(repository, param.getInstanceMetaData()), 
param.getModeConfiguration(),
+                new ClusterModeContextManager(), new GlobalLockContext(new 
GlobalLockPersistService(initDistributedLockHolder(repository))), 
eventBusContext);
     }
     
     private DistributedLockHolder initDistributedLockHolder(final 
ClusterPersistRepository repository) {
@@ -109,7 +110,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     private void registerOnline(final RegistryCenter registryCenter, final 
ContextManagerBuilderParameter param, final ContextManager contextManager) {
         
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
         
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
-        
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
+        
contextManager.getInstanceContext().getAllClusterInstances().addAll(new 
ComputeNodeStatusService(registryCenter.getRepository()).loadAllComputeNodeInstances());
         new ClusterEventSubscriberRegistry(contextManager, 
registryCenter).register();
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 34ae942497e..677e115f4d6 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -41,9 +41,6 @@ public final class RegistryCenter {
     
     private final Map<String, DatabaseConfiguration> databaseConfigs;
     
-    @Getter
-    private final ComputeNodeStatusService computeNodeStatusService;
-    
     private final GovernanceWatcherFactory listenerFactory;
     
     public RegistryCenter(final EventBusContext eventBusContext,
@@ -51,7 +48,6 @@ public final class RegistryCenter {
         this.repository = repository;
         this.instanceMetaData = instanceMetaData;
         this.databaseConfigs = databaseConfigs;
-        computeNodeStatusService = new ComputeNodeStatusService(repository);
         listenerFactory = new GovernanceWatcherFactory(repository, 
eventBusContext, getJDBCDatabaseName());
     }
     
@@ -65,6 +61,7 @@ public final class RegistryCenter {
      * @param computeNodeInstance compute node instance
      */
     public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
+        ComputeNodeStatusService computeNodeStatusService = new 
ComputeNodeStatusService(repository);
         
computeNodeStatusService.registerOnline(computeNodeInstance.getMetaData());
         
computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(),
 computeNodeInstance.getLabels());
         
computeNodeStatusService.persistInstanceState(computeNodeInstance.getCurrentInstanceId(),
 computeNodeInstance.getState());
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 86172a33363..9ce2a4ec9e8 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -28,11 +28,11 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -48,7 +48,7 @@ public final class ProcessListChangedSubscriber implements 
EventSubscriber {
     
     private final ContextManager contextManager;
     
-    private final RegistryCenter registryCenter;
+    private final ClusterPersistRepository repository;
     
     private final YamlProcessListSwapper swapper = new 
YamlProcessListSwapper();
     
@@ -64,10 +64,10 @@ public final class ProcessListChangedSubscriber implements 
EventSubscriber {
         }
         Collection<Process> processes = 
ProcessRegistry.getInstance().listAll();
         if (!processes.isEmpty()) {
-            registryCenter.getRepository().persist(
+            repository.persist(
                     ProcessNode.getProcessListInstancePath(event.getTaskId(), 
event.getInstanceId()), 
YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
         }
-        
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
 event.getTaskId()));
+        
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
 event.getTaskId()));
     }
     
     /**
@@ -98,7 +98,7 @@ public final class ProcessListChangedSubscriber implements 
EventSubscriber {
                 each.cancel();
             }
         }
-        
registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
 event.getProcessId()));
+        
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
 event.getProcessId()));
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 96488f881f1..4896bbbd27f 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -21,17 +21,16 @@ import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
+import org.apache.shardingsphere.infra.instance.ComputeNodeData;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import 
org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataFactory;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.infra.instance.ComputeNodeData;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -84,7 +83,7 @@ public final class ComputeNodeStatusService {
      * @param instanceId instance id
      * @param workerId worker id
      */
-    public void persistInstanceWorkerId(final String instanceId, final Integer 
workerId) {
+    public void persistInstanceWorkerId(final String instanceId, final int 
workerId) {
         
repository.persistEphemeral(ComputeNode.getInstanceWorkerIdNodePath(instanceId),
 String.valueOf(workerId));
     }
     
@@ -97,7 +96,7 @@ public final class ComputeNodeStatusService {
     @SuppressWarnings("unchecked")
     public Collection<String> loadInstanceLabels(final String instanceId) {
         String yamlContent = 
repository.getDirectly(ComputeNode.getInstanceLabelsNodePath(instanceId));
-        return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() : 
YamlEngine.unmarshal(yamlContent, Collection.class);
+        return Strings.isNullOrEmpty(yamlContent) ? new LinkedList<>() : 
YamlEngine.unmarshal(yamlContent, Collection.class);
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index 8fc242f2b9c..ee21caf4812 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -18,14 +18,14 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator;
 
 import com.google.common.base.Preconditions;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
-import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.exception.WorkIdAssignedException;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdNode;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
 
 import java.util.Collection;
@@ -33,39 +33,47 @@ import java.util.LinkedList;
 import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Cluster worker id generator for cluster mode.
+ * Worker ID generator for cluster mode.
  */
-@RequiredArgsConstructor
 @Slf4j
 public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
     
-    private final RegistryCenter registryCenter;
+    private final ClusterPersistRepository repository;
     
     private final InstanceMetaData instanceMetaData;
     
-    private volatile boolean isWarned;
+    private final ComputeNodeStatusService computeNodeStatusService;
+    
+    private final AtomicBoolean isWarned = new AtomicBoolean(false);
+    
+    public ClusterWorkerIdGenerator(final ClusterPersistRepository repository, 
final InstanceMetaData instanceMetaData) {
+        this.repository = repository;
+        this.instanceMetaData = instanceMetaData;
+        computeNodeStatusService = new ComputeNodeStatusService(repository);
+    }
     
     @Override
     public int generate(final Properties props) {
-        int result = 
registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::reGenerate);
+        int result = 
computeNodeStatusService.loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::reGenerate);
         checkIneffectiveConfiguration(result, props);
         return result;
     }
     
-    private Integer reGenerate() {
-        Optional<Integer> result;
+    private int reGenerate() {
+        Optional<Integer> generatedWorkId;
         do {
-            result = generateAvailableWorkerId();
-        } while (!result.isPresent());
-        Integer generatedWorkId = result.get();
-        
registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(),
 generatedWorkId);
-        return generatedWorkId;
+            generatedWorkId = generateAvailableWorkerId();
+        } while (!generatedWorkId.isPresent());
+        int result = generatedWorkId.get();
+        
computeNodeStatusService.persistInstanceWorkerId(instanceMetaData.getId(), 
result);
+        return result;
     }
     
     private Optional<Integer> generateAvailableWorkerId() {
-        Collection<Integer> assignedWorkerIds = 
registryCenter.getComputeNodeStatusService().getAssignedWorkerIds();
+        Collection<Integer> assignedWorkerIds = 
computeNodeStatusService.getAssignedWorkerIds();
         ShardingSpherePreconditions.checkState(assignedWorkerIds.size() <= 
1024, WorkIdAssignedException::new);
         Collection<Integer> availableWorkerIds = new LinkedList<>();
         for (int i = 0; i < 1024; i++) {
@@ -78,16 +86,16 @@ public final class ClusterWorkerIdGenerator implements 
WorkerIdGenerator {
         Integer preselectedWorkerId = priorityQueue.poll();
         Preconditions.checkState(null != preselectedWorkerId, "Preselected 
worker-id can not be null.");
         try {
-            
registryCenter.getRepository().persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()),
 instanceMetaData.getId());
+            
repository.persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()),
 instanceMetaData.getId());
             return Optional.of(preselectedWorkerId);
         } catch (final ClusterPersistRepositoryException ignore) {
             return Optional.empty();
         }
     }
     
-    private void checkIneffectiveConfiguration(final long generatedWorkerId, 
final Properties props) {
-        if (!isWarned && null != props && props.containsKey(WORKER_ID_KEY)) {
-            isWarned = true;
+    private void checkIneffectiveConfiguration(final int generatedWorkerId, 
final Properties props) {
+        if (!isWarned.get() && null != props && 
props.containsKey(WORKER_ID_KEY)) {
+            isWarned.set(true);
             log.warn("No need to configured {} in cluster mode, system 
assigned {} was {}", WORKER_ID_KEY, WORKER_ID_KEY, generatedWorkerId);
         }
     }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
index 67c9d3e90ea..3a0ee8062ed 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
@@ -35,8 +35,8 @@ public final class ClusterEventSubscriberRegistry extends 
EventSubscriberRegistr
                 new ConfigurationChangedSubscriber(contextManager),
                 new ResourceMetaDataChangedSubscriber(contextManager),
                 new DatabaseChangedSubscriber(contextManager),
-                new StateChangedSubscriber(contextManager, registryCenter),
-                new ProcessListChangedSubscriber(contextManager, 
registryCenter),
+                new StateChangedSubscriber(contextManager, 
registryCenter.getRepository()),
+                new ProcessListChangedSubscriber(contextManager, 
registryCenter.getRepository()),
                 new CacheEvictedSubscriber());
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index 9a87ffb3c92..296fadf72ee 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 
 import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
@@ -27,7 +26,6 @@ import 
org.apache.shardingsphere.infra.state.datasource.DataSourceState;
 import org.apache.shardingsphere.infra.state.datasource.DataSourceStateManager;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterLockDeletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
@@ -36,18 +34,24 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 /**
  * State changed subscriber.
  */
-@RequiredArgsConstructor
 @SuppressWarnings("unused")
 public final class StateChangedSubscriber implements EventSubscriber {
     
     private final ContextManager contextManager;
     
-    private final RegistryCenter registryCenter;
+    private final ComputeNodeStatusService computeNodeStatusService;
+    
+    public StateChangedSubscriber(final ContextManager contextManager, final 
ClusterPersistRepository repository) {
+        this.contextManager = contextManager;
+        computeNodeStatusService = new ComputeNodeStatusService(repository);
+    }
     
     /**
      * Renew disabled data source names.
@@ -125,7 +129,7 @@ public final class StateChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final InstanceOnlineEvent event) {
-        
contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
+        
contextManager.getInstanceContext().addComputeNodeInstance(computeNodeStatusService.loadComputeNodeInstance(event.getInstanceMetaData()));
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index c9f7ccd6ba0..ac7cb7ae419 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -34,7 +34,6 @@ import 
org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
@@ -74,7 +73,8 @@ class ProcessListChangedSubscriberTest {
     
     private ContextManager contextManager;
     
-    private RegistryCenter registryCenter;
+    @Mock
+    private ClusterPersistRepository repository;
     
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ShardingSphereDatabase database;
@@ -86,8 +86,7 @@ class ProcessListChangedSubscriberTest {
         contextManager.renewMetaDataContexts(new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new 
ShardingSphereMetaData(createDatabases(),
                 
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
                 new ConfigurationProperties(new Properties()))));
-        registryCenter = new RegistryCenter(eventBusContext, 
mock(ClusterPersistRepository.class), mock(ProxyInstanceMetaData.class), null);
-        subscriber = new ProcessListChangedSubscriber(contextManager, 
registryCenter);
+        subscriber = new ProcessListChangedSubscriber(contextManager, 
repository);
     }
     
     private ContextManagerBuilderParameter 
createContextManagerBuilderParameter() {
@@ -118,7 +117,6 @@ class ProcessListChangedSubscriberTest {
         ProcessRegistry.getInstance().add(process);
         String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
         subscriber.reportLocalProcesses(new 
ReportLocalProcessesEvent(instanceId, processId));
-        ClusterPersistRepository repository = registryCenter.getRepository();
         verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
                 "processes:" + System.lineSeparator() + "- completedUnitCount: 
0" + System.lineSeparator()
                         + "  id: foo_id" + System.lineSeparator()
@@ -148,7 +146,6 @@ class ProcessListChangedSubscriberTest {
         String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
         String processId = "foo_id";
         subscriber.killLocalProcess(new KillLocalProcessEvent(instanceId, 
processId));
-        ClusterPersistRepository repository = registryCenter.getRepository();
         verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/" 
+ instanceId + ":foo_id");
     }
     
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
index 1dcdf6f1dd0..e04ab181068 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
@@ -19,20 +19,16 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.work
 
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.test.util.PropertiesBuilder;
 import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
 import org.junit.jupiter.api.Test;
 import org.mockito.stubbing.Answer;
 
-import java.util.Collections;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -43,21 +39,17 @@ class ClusterWorkerIdGeneratorTest {
     void assertGenerateWithExistedWorkerId() {
         InstanceMetaData instanceMetaData = mock(InstanceMetaData.class);
         when(instanceMetaData.getId()).thenReturn("foo_id");
-        RegistryCenter registryCenter = mock(RegistryCenter.class, 
RETURNS_DEEP_STUBS);
-        
when(registryCenter.getComputeNodeStatusService().loadInstanceWorkerId("foo_id")).thenReturn(Optional.of(10));
-        assertThat(new ClusterWorkerIdGenerator(registryCenter, 
instanceMetaData).generate(PropertiesBuilder.build(new 
Property(WorkerIdGenerator.WORKER_ID_KEY, "1"))), is(10));
+        ClusterPersistRepository repository = 
mock(ClusterPersistRepository.class);
+        
when(repository.getDirectly("/nodes/compute_nodes/worker_id/foo_id")).thenReturn("10");
+        assertThat(new ClusterWorkerIdGenerator(repository, 
instanceMetaData).generate(PropertiesBuilder.build(new 
Property(WorkerIdGenerator.WORKER_ID_KEY, "1"))), is(10));
     }
     
     @Test
     void assertGenerateWithoutExistedWorkerId() {
         InstanceMetaData instanceMetaData = mock(InstanceMetaData.class);
         when(instanceMetaData.getId()).thenReturn("foo_id");
-        RegistryCenter registryCenter = mock(RegistryCenter.class, 
RETURNS_DEEP_STUBS);
         ClusterPersistRepository repository = 
mock(ClusterPersistRepository.class);
         doAnswer((Answer<Object>) invocation -> 
"foo_id").when(repository).persistEphemeral("/worker_id/0", "foo_id");
-        when(registryCenter.getRepository()).thenReturn(repository);
-        
when(registryCenter.getComputeNodeStatusService().loadInstanceWorkerId("foo_id")).thenReturn(Optional.empty());
-        
when(registryCenter.getComputeNodeStatusService().getAssignedWorkerIds()).thenReturn(Collections.singleton(1));
-        assertThat(new ClusterWorkerIdGenerator(registryCenter, 
instanceMetaData).generate(new Properties()), is(0));
+        assertThat(new ClusterWorkerIdGenerator(repository, 
instanceMetaData).generate(new Properties()), is(0));
     }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index 2f51892339c..c036c505d23 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -36,7 +36,6 @@ import 
org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterLockDeletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
@@ -91,7 +90,7 @@ class StateChangedSubscriberTest {
         contextManager.renewMetaDataContexts(new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new 
ShardingSphereMetaData(createDatabases(),
                 
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
                 new ConfigurationProperties(new Properties()))));
-        subscriber = new StateChangedSubscriber(contextManager, new 
RegistryCenter(eventBusContext, mock(ClusterPersistRepository.class), 
mock(ProxyInstanceMetaData.class), null));
+        subscriber = new StateChangedSubscriber(contextManager, 
mock(ClusterPersistRepository.class));
     }
     
     private ContextManagerBuilderParameter 
createContextManagerBuilderParameter() {
diff --git 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
index 0fafbb17c9a..6030c1de961 100644
--- 
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
+++ 
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
 import java.util.Properties;
 
 /**
- * Worker id generator for standalone mode.
+ * Worker ID generator for standalone mode.
  */
 public final class StandaloneWorkerIdGenerator implements WorkerIdGenerator {
     


Reply via email to