This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d5e165322b1 Refactor RegistryCenter (#31233)
d5e165322b1 is described below
commit d5e165322b1bafd6e1129fa8bad40e40f55bedb0
Author: Liang Zhang <[email protected]>
AuthorDate: Wed May 15 22:45:21 2024 +0800
Refactor RegistryCenter (#31233)
* Refactor RegistryCenter
* Refactor RegistryCenter
---
.../mode/manager/cluster/coordinator/RegistryCenter.java | 10 +++++-----
.../ShardingSphereSchemaDataRegistrySubscriber.java | 4 +---
.../process/subscriber/ClusterProcessSubscriber.java | 11 +++--------
.../cluster/subscriber/ClusterStatusSubscriber.java | 8 ++------
.../compute/subscriber/ComputeNodeStatusSubscriber.java | 16 +++-------------
.../subscriber/QualifiedDataSourceStatusSubscriber.java | 8 ++------
.../cluster/subscriber/ClusterStatusSubscriberTest.java | 13 ++++++-------
.../QualifiedDataSourceStatusSubscriberTest.java | 5 +----
8 files changed, 23 insertions(+), 52 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index f96d03fab2a..f3372d843eb 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -95,11 +95,11 @@ public final class RegistryCenter {
}
private void createSubscribers(final ClusterPersistRepository repository) {
- new ComputeNodeStatusSubscriber(this, repository);
- new ClusterStatusSubscriber(repository, eventBusContext);
- new QualifiedDataSourceStatusSubscriber(repository, eventBusContext);
- new ClusterProcessSubscriber(repository, eventBusContext);
- new ShardingSphereSchemaDataRegistrySubscriber(repository,
eventBusContext);
+ eventBusContext.register(new ComputeNodeStatusSubscriber(this,
repository));
+ eventBusContext.register(new ClusterStatusSubscriber(repository));
+ eventBusContext.register(new
QualifiedDataSourceStatusSubscriber(repository));
+ eventBusContext.register(new ClusterProcessSubscriber(repository,
eventBusContext));
+ eventBusContext.register(new
ShardingSphereSchemaDataRegistrySubscriber(repository));
}
/**
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index 8c738752d9c..c7843a645ed 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -18,7 +18,6 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import
org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
@@ -31,9 +30,8 @@ public final class ShardingSphereSchemaDataRegistrySubscriber
implements EventSu
private final ShardingSphereDataPersistService persistService;
- public ShardingSphereSchemaDataRegistrySubscriber(final
ClusterPersistRepository repository, final EventBusContext eventBusContext) {
+ public ShardingSphereSchemaDataRegistrySubscriber(final
ClusterPersistRepository repository) {
persistService = new ShardingSphereDataPersistService(repository);
- eventBusContext.register(this);
}
/**
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
index f6554b0b5fc..ca7219b50bb 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
@@ -42,20 +43,14 @@ import java.util.stream.Stream;
/**
* Cluster process subscriber.
*/
+@RequiredArgsConstructor
public final class ClusterProcessSubscriber implements ProcessSubscriber,
EventSubscriber {
private final PersistRepository repository;
private final EventBusContext eventBusContext;
- private final YamlProcessListSwapper swapper;
-
- public ClusterProcessSubscriber(final PersistRepository repository, final
EventBusContext eventBusContext) {
- this.repository = repository;
- this.eventBusContext = eventBusContext;
- swapper = new YamlProcessListSwapper();
- eventBusContext.register(this);
- }
+ private final YamlProcessListSwapper swapper = new
YamlProcessListSwapper();
@Override
@Subscribe
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java
index 12007181f1c..da35826018e 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
@@ -27,15 +27,11 @@ import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
/**
* Cluster status subscriber.
*/
+@RequiredArgsConstructor
public final class ClusterStatusSubscriber implements EventSubscriber {
private final ClusterPersistRepository repository;
- public ClusterStatusSubscriber(final ClusterPersistRepository repository,
final EventBusContext eventBusContext) {
- this.repository = repository;
- eventBusContext.register(this);
- }
-
/**
* Update cluster status.
*
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
index 3b24717a67b..c1f26ea4b70 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.event.node.ComputeNodeStatusChangedEvent;
@@ -25,23 +26,16 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import java.util.Collections;
-
/**
* Compute node status subscriber.
*/
+@RequiredArgsConstructor
public final class ComputeNodeStatusSubscriber implements EventSubscriber {
private final RegistryCenter registryCenter;
private final ClusterPersistRepository repository;
- public ComputeNodeStatusSubscriber(final RegistryCenter registryCenter,
final ClusterPersistRepository repository) {
- this.registryCenter = registryCenter;
- this.repository = repository;
- registryCenter.getEventBusContext().register(this);
- }
-
/**
* Update compute node status.
*
@@ -59,10 +53,6 @@ public final class ComputeNodeStatusSubscriber implements
EventSubscriber {
*/
@Subscribe
public void update(final LabelsChangedEvent event) {
- if (event.getLabels().isEmpty()) {
-
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(),
Collections.emptyList());
- } else {
-
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(),
event.getLabels());
- }
+
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(),
event.getLabels());
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
index 7893bf4d959..1dd359778a6 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -27,15 +27,11 @@ import
org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
/**
* Qualified data source status subscriber.
*/
+@RequiredArgsConstructor
public final class QualifiedDataSourceStatusSubscriber implements
EventSubscriber {
private final ClusterPersistRepository repository;
- public QualifiedDataSourceStatusSubscriber(final ClusterPersistRepository
repository, final EventBusContext eventBusContext) {
- this.repository = repository;
- eventBusContext.register(this);
- }
-
/**
* Delete qualified data source.
*
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriberTest.java
index 0657f71377c..110e7395af4 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriberTest.java
@@ -17,14 +17,13 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.junit.jupiter.api.extension.ExtendWith;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.verify;
@@ -37,7 +36,7 @@ class ClusterStatusSubscriberTest {
@Test
void assertUpdate() {
- ClusterStatusSubscriber clusterStatusSubscriber = new
ClusterStatusSubscriber(repository, new EventBusContext());
+ ClusterStatusSubscriber clusterStatusSubscriber = new
ClusterStatusSubscriber(repository);
ClusterStatusChangedEvent event = new
ClusterStatusChangedEvent(ClusterState.OK);
clusterStatusSubscriber.update(event);
verify(repository).persist(ComputeNode.getClusterStatusNodePath(),
ClusterState.OK.name());
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriberTest.java
index 69a2e31cdb9..c28beba4fab 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriberTest.java
@@ -18,7 +18,6 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
@@ -35,15 +34,13 @@ class QualifiedDataSourceStatusSubscriberTest {
@Mock
private ClusterPersistRepository repository;
- private final EventBusContext eventBusContext = new EventBusContext();
-
@Test
void assertDeleteStorageNodeDataSourceDataSourceState() {
String databaseName = "replica_query_db";
String groupName = "readwrite_ds";
String dataSourceName = "replica_ds_0";
QualifiedDataSourceDeletedEvent event = new
QualifiedDataSourceDeletedEvent(new QualifiedDataSource(databaseName,
groupName, dataSourceName));
- new QualifiedDataSourceStatusSubscriber(repository,
eventBusContext).delete(event);
+ new QualifiedDataSourceStatusSubscriber(repository).delete(event);
verify(repository).delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(new
QualifiedDataSource(databaseName, groupName, dataSourceName)));
}
}