This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 010462f6a0c Refactor RegistryCenter (#31254)
010462f6a0c is described below
commit 010462f6a0c6f1d6efd1c91129143aaf74e2e735
Author: Liang Zhang <[email protected]>
AuthorDate: Thu May 16 23:43:45 2024 +0800
Refactor RegistryCenter (#31254)
* Refactor ClusterContextManagerBuilder
* Refactor RegistryCenter
* Refactor RegistryCenter
* Refactor RegistryCenter
---
.../manager/cluster/ClusterContextManagerBuilder.java | 16 +++++++++++++---
.../manager/cluster/coordinator/RegistryCenter.java | 18 ------------------
.../distsql/ral/updatable/LockClusterExecutorTest.java | 3 ++-
.../ral/updatable/UnlockClusterExecutorTest.java | 3 ++-
4 files changed, 17 insertions(+), 23 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index d97d32bab41..2f6f3c59ac8 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.mode.manager.ContextManagerAware;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service.ClusterStatusService;
@@ -44,9 +45,13 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
+import
org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
+import
org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService;
import java.sql.SQLException;
import java.util.Optional;
+import java.util.Properties;
/**
* Cluster context manager builder.
@@ -62,7 +67,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
((InstanceContextAware)
registryCenter.getRepository()).setInstanceContext(instanceContext);
}
MetaDataPersistService metaDataPersistService = new
MetaDataPersistService(repository);
- MetaDataContexts metaDataContexts =
MetaDataContextsFactory.create(metaDataPersistService, param, instanceContext,
registryCenter.getQualifiedDataSourceStatusService().loadStatus());
+ MetaDataContexts metaDataContexts =
MetaDataContextsFactory.create(metaDataPersistService, param, instanceContext,
new QualifiedDataSourceStatusService(repository).loadStatus());
ContextManager result = new ContextManager(metaDataContexts,
instanceContext);
setContextManagerAware(result);
createSubscribers(eventBusContext, repository);
@@ -79,8 +84,13 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
}
private InstanceContext buildInstanceContext(final RegistryCenter
registryCenter, final ContextManagerBuilderParameter param, final
EventBusContext eventBusContext) {
- return new InstanceContext(new
ComputeNodeInstance(param.getInstanceMetaData()), new
ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()),
- param.getModeConfiguration(), new ClusterModeContextManager(),
new GlobalLockContext(registryCenter.getGlobalLockPersistService()),
eventBusContext);
+ return new InstanceContext(new
ComputeNodeInstance(param.getInstanceMetaData()), new
ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()),
param.getModeConfiguration(),
+ new ClusterModeContextManager(), new GlobalLockContext(new
GlobalLockPersistService(initDistributedLockHolder(registryCenter.getRepository()))),
eventBusContext);
+ }
+
+ private DistributedLockHolder initDistributedLockHolder(final
ClusterPersistRepository repository) {
+ DistributedLockHolder distributedLockHolder =
repository.getDistributedLockHolder();
+ return null == distributedLockHolder ? new
DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new
Properties())) : distributedLockHolder;
}
private void setContextManagerAware(final ContextManager contextManager) {
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 0b92eff5d3f..34ae942497e 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -23,16 +23,11 @@ import
org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
-import
org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
-import
org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService;
import java.util.Map;
-import java.util.Properties;
/**
* Registry center.
@@ -46,15 +41,9 @@ public final class RegistryCenter {
private final Map<String, DatabaseConfiguration> databaseConfigs;
- @Getter
- private final QualifiedDataSourceStatusService
qualifiedDataSourceStatusService;
-
@Getter
private final ComputeNodeStatusService computeNodeStatusService;
- @Getter
- private final GlobalLockPersistService globalLockPersistService;
-
private final GovernanceWatcherFactory listenerFactory;
public RegistryCenter(final EventBusContext eventBusContext,
@@ -62,17 +51,10 @@ public final class RegistryCenter {
this.repository = repository;
this.instanceMetaData = instanceMetaData;
this.databaseConfigs = databaseConfigs;
- qualifiedDataSourceStatusService = new
QualifiedDataSourceStatusService(repository);
computeNodeStatusService = new ComputeNodeStatusService(repository);
- globalLockPersistService = new
GlobalLockPersistService(initDistributedLockHolder(repository));
listenerFactory = new GovernanceWatcherFactory(repository,
eventBusContext, getJDBCDatabaseName());
}
- private DistributedLockHolder initDistributedLockHolder(final
ClusterPersistRepository repository) {
- DistributedLockHolder distributedLockHolder =
repository.getDistributedLockHolder();
- return null == distributedLockHolder ? new
DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new
Properties())) : distributedLockHolder;
- }
-
private String getJDBCDatabaseName() {
return instanceMetaData instanceof JDBCInstanceMetaData ?
databaseConfigs.keySet().stream().findFirst().orElse(null) : null;
}
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutorTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutorTest.java
index b14babcb81e..e43deee62e8 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutorTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutorTest.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
import
org.apache.shardingsphere.distsql.statement.ral.updatable.LockClusterStatement;
import
org.apache.shardingsphere.infra.spi.exception.ServiceProviderNotFoundException;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
+import org.apache.shardingsphere.mode.exception.LockedClusterException;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
@@ -45,7 +46,7 @@ class LockClusterExecutorTest {
void assertExecuteUpdateWithLockedCluster() {
ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
when(contextManager.getClusterStateContext().getCurrentState()).thenReturn(ClusterState.UNAVAILABLE);
- assertThrows(IllegalStateException.class, () ->
executor.executeUpdate(new LockClusterStatement(new AlgorithmSegment("FOO", new
Properties())), contextManager));
+ assertThrows(LockedClusterException.class, () ->
executor.executeUpdate(new LockClusterStatement(new AlgorithmSegment("FOO", new
Properties())), contextManager));
}
@Test
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutorTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutorTest.java
index ff4e3365d57..99bc88db730 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutorTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutorTest.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
import
org.apache.shardingsphere.distsql.statement.ral.updatable.UnlockClusterStatement;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
+import org.apache.shardingsphere.mode.exception.NotLockedClusterException;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
@@ -42,6 +43,6 @@ class UnlockClusterExecutorTest {
ContextManager contextManager = mock(ContextManager.class,
RETURNS_DEEP_STUBS);
when(contextManager.getClusterStateContext().getCurrentState()).thenReturn(ClusterState.OK);
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
- assertThrows(IllegalStateException.class, () ->
executor.executeUpdate(new UnlockClusterStatement(), contextManager));
+ assertThrows(NotLockedClusterException.class, () ->
executor.executeUpdate(new UnlockClusterStatement(), contextManager));
}
}