This is an automated email from the ASF dual-hosted git repository.
zichaowang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c742764f202 Refactor ClusterWorkerIdGenerator (#31255)
c742764f202 is described below
commit c742764f2023ca3ba0d09f36931437efa8891106
Author: Liang Zhang <[email protected]>
AuthorDate: Fri May 17 03:17:25 2024 +0800
Refactor ClusterWorkerIdGenerator (#31255)
---
.../infra/instance/workerid/WorkerIdGenerator.java | 2 +-
.../cluster/ClusterContextManagerBuilder.java | 11 ++---
.../cluster/coordinator/RegistryCenter.java | 5 +--
.../subscriber/ProcessListChangedSubscriber.java | 10 ++---
.../compute/service/ComputeNodeStatusService.java | 7 ++--
.../generator/ClusterWorkerIdGenerator.java | 48 +++++++++++++---------
.../subscriber/ClusterEventSubscriberRegistry.java | 4 +-
.../subscriber/StateChangedSubscriber.java | 14 ++++---
.../ProcessListChangedSubscriberTest.java | 9 ++--
.../generator/ClusterWorkerIdGeneratorTest.java | 16 ++------
.../subscriber/StateChangedSubscriberTest.java | 3 +-
.../generator/StandaloneWorkerIdGenerator.java | 2 +-
12 files changed, 64 insertions(+), 67 deletions(-)
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
index f0f9b676e29..bc6681cced5 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.infra.instance.workerid;
import java.util.Properties;
/**
- * Worker id generator.
+ * Worker ID generator.
*/
public interface WorkerIdGenerator {
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 2f6f3c59ac8..7e6daee7e26 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -36,6 +36,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metad
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service.ClusterStatusService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber.ClusterStatusSubscriber;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
@@ -62,7 +63,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
public ContextManager build(final ContextManagerBuilderParameter param,
final EventBusContext eventBusContext) throws SQLException {
ClusterPersistRepository repository =
getClusterPersistRepository((ClusterPersistRepositoryConfiguration)
param.getModeConfiguration().getRepository());
RegistryCenter registryCenter = new RegistryCenter(eventBusContext,
repository, param.getInstanceMetaData(), param.getDatabaseConfigs());
- InstanceContext instanceContext = buildInstanceContext(registryCenter,
param, eventBusContext);
+ InstanceContext instanceContext = buildInstanceContext(repository,
param, eventBusContext);
if (registryCenter.getRepository() instanceof InstanceContextAware) {
((InstanceContextAware)
registryCenter.getRepository()).setInstanceContext(instanceContext);
}
@@ -83,9 +84,9 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
return result;
}
- private InstanceContext buildInstanceContext(final RegistryCenter
registryCenter, final ContextManagerBuilderParameter param, final
EventBusContext eventBusContext) {
- return new InstanceContext(new
ComputeNodeInstance(param.getInstanceMetaData()), new
ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()),
param.getModeConfiguration(),
- new ClusterModeContextManager(), new GlobalLockContext(new
GlobalLockPersistService(initDistributedLockHolder(registryCenter.getRepository()))),
eventBusContext);
+ private InstanceContext buildInstanceContext(final
ClusterPersistRepository repository, final ContextManagerBuilderParameter
param, final EventBusContext eventBusContext) {
+ return new InstanceContext(new
ComputeNodeInstance(param.getInstanceMetaData()), new
ClusterWorkerIdGenerator(repository, param.getInstanceMetaData()),
param.getModeConfiguration(),
+ new ClusterModeContextManager(), new GlobalLockContext(new
GlobalLockPersistService(initDistributedLockHolder(repository))),
eventBusContext);
}
private DistributedLockHolder initDistributedLockHolder(final
ClusterPersistRepository repository) {
@@ -109,7 +110,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
private void registerOnline(final RegistryCenter registryCenter, final
ContextManagerBuilderParameter param, final ContextManager contextManager) {
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
-
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
+
contextManager.getInstanceContext().getAllClusterInstances().addAll(new
ComputeNodeStatusService(registryCenter.getRepository()).loadAllComputeNodeInstances());
new ClusterEventSubscriberRegistry(contextManager,
registryCenter).register();
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 34ae942497e..677e115f4d6 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -41,9 +41,6 @@ public final class RegistryCenter {
private final Map<String, DatabaseConfiguration> databaseConfigs;
- @Getter
- private final ComputeNodeStatusService computeNodeStatusService;
-
private final GovernanceWatcherFactory listenerFactory;
public RegistryCenter(final EventBusContext eventBusContext,
@@ -51,7 +48,6 @@ public final class RegistryCenter {
this.repository = repository;
this.instanceMetaData = instanceMetaData;
this.databaseConfigs = databaseConfigs;
- computeNodeStatusService = new ComputeNodeStatusService(repository);
listenerFactory = new GovernanceWatcherFactory(repository,
eventBusContext, getJDBCDatabaseName());
}
@@ -65,6 +61,7 @@ public final class RegistryCenter {
* @param computeNodeInstance compute node instance
*/
public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
+ ComputeNodeStatusService computeNodeStatusService = new
ComputeNodeStatusService(repository);
computeNodeStatusService.registerOnline(computeNodeInstance.getMetaData());
computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(),
computeNodeInstance.getLabels());
computeNodeStatusService.persistInstanceState(computeNodeInstance.getCurrentInstanceId(),
computeNodeInstance.getState());
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
index 86172a33363..9ce2a4ec9e8 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriber.java
@@ -28,11 +28,11 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.sql.SQLException;
import java.sql.Statement;
@@ -48,7 +48,7 @@ public final class ProcessListChangedSubscriber implements
EventSubscriber {
private final ContextManager contextManager;
- private final RegistryCenter registryCenter;
+ private final ClusterPersistRepository repository;
private final YamlProcessListSwapper swapper = new
YamlProcessListSwapper();
@@ -64,10 +64,10 @@ public final class ProcessListChangedSubscriber implements
EventSubscriber {
}
Collection<Process> processes =
ProcessRegistry.getInstance().listAll();
if (!processes.isEmpty()) {
- registryCenter.getRepository().persist(
+ repository.persist(
ProcessNode.getProcessListInstancePath(event.getTaskId(),
event.getInstanceId()),
YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
}
-
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
event.getTaskId()));
+
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
event.getTaskId()));
}
/**
@@ -98,7 +98,7 @@ public final class ProcessListChangedSubscriber implements
EventSubscriber {
each.cancel();
}
}
-
registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
event.getProcessId()));
+
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
event.getProcessId()));
}
/**
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 96488f881f1..4896bbbd27f 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -21,17 +21,16 @@ import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
+import org.apache.shardingsphere.infra.instance.ComputeNodeData;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import
org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataFactory;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.infra.instance.ComputeNodeData;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -84,7 +83,7 @@ public final class ComputeNodeStatusService {
* @param instanceId instance id
* @param workerId worker id
*/
- public void persistInstanceWorkerId(final String instanceId, final Integer
workerId) {
+ public void persistInstanceWorkerId(final String instanceId, final int
workerId) {
repository.persistEphemeral(ComputeNode.getInstanceWorkerIdNodePath(instanceId),
String.valueOf(workerId));
}
@@ -97,7 +96,7 @@ public final class ComputeNodeStatusService {
@SuppressWarnings("unchecked")
public Collection<String> loadInstanceLabels(final String instanceId) {
String yamlContent =
repository.getDirectly(ComputeNode.getInstanceLabelsNodePath(instanceId));
- return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() :
YamlEngine.unmarshal(yamlContent, Collection.class);
+ return Strings.isNullOrEmpty(yamlContent) ? new LinkedList<>() :
YamlEngine.unmarshal(yamlContent, Collection.class);
}
/**
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index 8fc242f2b9c..ee21caf4812 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -18,14 +18,14 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator;
import com.google.common.base.Preconditions;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.exception.WorkIdAssignedException;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdNode;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
import java.util.Collection;
@@ -33,39 +33,47 @@ import java.util.LinkedList;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
- * Cluster worker id generator for cluster mode.
+ * Worker ID generator for cluster mode.
*/
-@RequiredArgsConstructor
@Slf4j
public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
- private final RegistryCenter registryCenter;
+ private final ClusterPersistRepository repository;
private final InstanceMetaData instanceMetaData;
- private volatile boolean isWarned;
+ private final ComputeNodeStatusService computeNodeStatusService;
+
+ private final AtomicBoolean isWarned = new AtomicBoolean(false);
+
+ public ClusterWorkerIdGenerator(final ClusterPersistRepository repository,
final InstanceMetaData instanceMetaData) {
+ this.repository = repository;
+ this.instanceMetaData = instanceMetaData;
+ computeNodeStatusService = new ComputeNodeStatusService(repository);
+ }
@Override
public int generate(final Properties props) {
- int result =
registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::reGenerate);
+ int result =
computeNodeStatusService.loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::reGenerate);
checkIneffectiveConfiguration(result, props);
return result;
}
- private Integer reGenerate() {
- Optional<Integer> result;
+ private int reGenerate() {
+ Optional<Integer> generatedWorkId;
do {
- result = generateAvailableWorkerId();
- } while (!result.isPresent());
- Integer generatedWorkId = result.get();
-
registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(),
generatedWorkId);
- return generatedWorkId;
+ generatedWorkId = generateAvailableWorkerId();
+ } while (!generatedWorkId.isPresent());
+ int result = generatedWorkId.get();
+
computeNodeStatusService.persistInstanceWorkerId(instanceMetaData.getId(),
result);
+ return result;
}
private Optional<Integer> generateAvailableWorkerId() {
- Collection<Integer> assignedWorkerIds =
registryCenter.getComputeNodeStatusService().getAssignedWorkerIds();
+ Collection<Integer> assignedWorkerIds =
computeNodeStatusService.getAssignedWorkerIds();
ShardingSpherePreconditions.checkState(assignedWorkerIds.size() <=
1024, WorkIdAssignedException::new);
Collection<Integer> availableWorkerIds = new LinkedList<>();
for (int i = 0; i < 1024; i++) {
@@ -78,16 +86,16 @@ public final class ClusterWorkerIdGenerator implements
WorkerIdGenerator {
Integer preselectedWorkerId = priorityQueue.poll();
Preconditions.checkState(null != preselectedWorkerId, "Preselected
worker-id can not be null.");
try {
-
registryCenter.getRepository().persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()),
instanceMetaData.getId());
+
repository.persistExclusiveEphemeral(WorkerIdNode.getWorkerIdGeneratorPath(preselectedWorkerId.toString()),
instanceMetaData.getId());
return Optional.of(preselectedWorkerId);
} catch (final ClusterPersistRepositoryException ignore) {
return Optional.empty();
}
}
- private void checkIneffectiveConfiguration(final long generatedWorkerId,
final Properties props) {
- if (!isWarned && null != props && props.containsKey(WORKER_ID_KEY)) {
- isWarned = true;
+ private void checkIneffectiveConfiguration(final int generatedWorkerId,
final Properties props) {
+ if (!isWarned.get() && null != props &&
props.containsKey(WORKER_ID_KEY)) {
+ isWarned.set(true);
log.warn("No need to configured {} in cluster mode, system
assigned {} was {}", WORKER_ID_KEY, WORKER_ID_KEY, generatedWorkerId);
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
index 67c9d3e90ea..3a0ee8062ed 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
@@ -35,8 +35,8 @@ public final class ClusterEventSubscriberRegistry extends
EventSubscriberRegistr
new ConfigurationChangedSubscriber(contextManager),
new ResourceMetaDataChangedSubscriber(contextManager),
new DatabaseChangedSubscriber(contextManager),
- new StateChangedSubscriber(contextManager, registryCenter),
- new ProcessListChangedSubscriber(contextManager,
registryCenter),
+ new StateChangedSubscriber(contextManager,
registryCenter.getRepository()),
+ new ProcessListChangedSubscriber(contextManager,
registryCenter.getRepository()),
new CacheEvictedSubscriber());
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index 9a87ffb3c92..296fadf72ee 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
@@ -27,7 +26,6 @@ import
org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.infra.state.datasource.DataSourceStateManager;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterLockDeletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
@@ -36,18 +34,24 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
/**
* State changed subscriber.
*/
-@RequiredArgsConstructor
@SuppressWarnings("unused")
public final class StateChangedSubscriber implements EventSubscriber {
private final ContextManager contextManager;
- private final RegistryCenter registryCenter;
+ private final ComputeNodeStatusService computeNodeStatusService;
+
+ public StateChangedSubscriber(final ContextManager contextManager, final
ClusterPersistRepository repository) {
+ this.contextManager = contextManager;
+ computeNodeStatusService = new ComputeNodeStatusService(repository);
+ }
/**
* Renew disabled data source names.
@@ -125,7 +129,7 @@ public final class StateChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final InstanceOnlineEvent event) {
-
contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
+
contextManager.getInstanceContext().addComputeNodeInstance(computeNodeStatusService.loadComputeNodeInstance(event.getInstanceMetaData()));
}
/**
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index c9f7ccd6ba0..ac7cb7ae419 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -34,7 +34,6 @@ import
org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
@@ -74,7 +73,8 @@ class ProcessListChangedSubscriberTest {
private ContextManager contextManager;
- private RegistryCenter registryCenter;
+ @Mock
+ private ClusterPersistRepository repository;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ShardingSphereDatabase database;
@@ -86,8 +86,7 @@ class ProcessListChangedSubscriberTest {
contextManager.renewMetaDataContexts(new
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new
ShardingSphereMetaData(createDatabases(),
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(),
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
new ConfigurationProperties(new Properties()))));
- registryCenter = new RegistryCenter(eventBusContext,
mock(ClusterPersistRepository.class), mock(ProxyInstanceMetaData.class), null);
- subscriber = new ProcessListChangedSubscriber(contextManager,
registryCenter);
+ subscriber = new ProcessListChangedSubscriber(contextManager,
repository);
}
private ContextManagerBuilderParameter
createContextManagerBuilderParameter() {
@@ -118,7 +117,6 @@ class ProcessListChangedSubscriberTest {
ProcessRegistry.getInstance().add(process);
String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
subscriber.reportLocalProcesses(new
ReportLocalProcessesEvent(instanceId, processId));
- ClusterPersistRepository repository = registryCenter.getRepository();
verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
"processes:" + System.lineSeparator() + "- completedUnitCount:
0" + System.lineSeparator()
+ " id: foo_id" + System.lineSeparator()
@@ -148,7 +146,6 @@ class ProcessListChangedSubscriberTest {
String instanceId =
contextManager.getInstanceContext().getInstance().getMetaData().getId();
String processId = "foo_id";
subscriber.killLocalProcess(new KillLocalProcessEvent(instanceId,
processId));
- ClusterPersistRepository repository = registryCenter.getRepository();
verify(repository).delete("/nodes/compute_nodes/kill_process_trigger/"
+ instanceId + ":foo_id");
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
index 1dcdf6f1dd0..e04ab181068 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGeneratorTest.java
@@ -19,20 +19,16 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.work
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.test.util.PropertiesBuilder;
import org.apache.shardingsphere.test.util.PropertiesBuilder.Property;
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;
-import java.util.Collections;
-import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -43,21 +39,17 @@ class ClusterWorkerIdGeneratorTest {
void assertGenerateWithExistedWorkerId() {
InstanceMetaData instanceMetaData = mock(InstanceMetaData.class);
when(instanceMetaData.getId()).thenReturn("foo_id");
- RegistryCenter registryCenter = mock(RegistryCenter.class,
RETURNS_DEEP_STUBS);
-
when(registryCenter.getComputeNodeStatusService().loadInstanceWorkerId("foo_id")).thenReturn(Optional.of(10));
- assertThat(new ClusterWorkerIdGenerator(registryCenter,
instanceMetaData).generate(PropertiesBuilder.build(new
Property(WorkerIdGenerator.WORKER_ID_KEY, "1"))), is(10));
+ ClusterPersistRepository repository =
mock(ClusterPersistRepository.class);
+
when(repository.getDirectly("/nodes/compute_nodes/worker_id/foo_id")).thenReturn("10");
+ assertThat(new ClusterWorkerIdGenerator(repository,
instanceMetaData).generate(PropertiesBuilder.build(new
Property(WorkerIdGenerator.WORKER_ID_KEY, "1"))), is(10));
}
@Test
void assertGenerateWithoutExistedWorkerId() {
InstanceMetaData instanceMetaData = mock(InstanceMetaData.class);
when(instanceMetaData.getId()).thenReturn("foo_id");
- RegistryCenter registryCenter = mock(RegistryCenter.class,
RETURNS_DEEP_STUBS);
ClusterPersistRepository repository =
mock(ClusterPersistRepository.class);
doAnswer((Answer<Object>) invocation ->
"foo_id").when(repository).persistEphemeral("/worker_id/0", "foo_id");
- when(registryCenter.getRepository()).thenReturn(repository);
-
when(registryCenter.getComputeNodeStatusService().loadInstanceWorkerId("foo_id")).thenReturn(Optional.empty());
-
when(registryCenter.getComputeNodeStatusService().getAssignedWorkerIds()).thenReturn(Collections.singleton(1));
- assertThat(new ClusterWorkerIdGenerator(registryCenter,
instanceMetaData).generate(new Properties()), is(0));
+ assertThat(new ClusterWorkerIdGenerator(repository,
instanceMetaData).generate(new Properties()), is(0));
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index 2f51892339c..c036c505d23 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -36,7 +36,6 @@ import
org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterLockDeletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
@@ -91,7 +90,7 @@ class StateChangedSubscriberTest {
contextManager.renewMetaDataContexts(new
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new
ShardingSphereMetaData(createDatabases(),
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(),
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
new ConfigurationProperties(new Properties()))));
- subscriber = new StateChangedSubscriber(contextManager, new
RegistryCenter(eventBusContext, mock(ClusterPersistRepository.class),
mock(ProxyInstanceMetaData.class), null));
+ subscriber = new StateChangedSubscriber(contextManager,
mock(ClusterPersistRepository.class));
}
private ContextManagerBuilderParameter
createContextManagerBuilderParameter() {
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
index 0fafbb17c9a..6030c1de961 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
import java.util.Properties;
/**
- * Worker id generator for standalone mode.
+ * Worker ID generator for standalone mode.
*/
public final class StandaloneWorkerIdGenerator implements WorkerIdGenerator {