This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d5e165322b1 Refactor RegistryCenter (#31233)
d5e165322b1 is described below

commit d5e165322b1bafd6e1129fa8bad40e40f55bedb0
Author: Liang Zhang <[email protected]>
AuthorDate: Wed May 15 22:45:21 2024 +0800

    Refactor RegistryCenter (#31233)
    
    * Refactor RegistryCenter
    
    * Refactor RegistryCenter
---
 .../mode/manager/cluster/coordinator/RegistryCenter.java | 10 +++++-----
 .../ShardingSphereSchemaDataRegistrySubscriber.java      |  4 +---
 .../process/subscriber/ClusterProcessSubscriber.java     | 11 +++--------
 .../cluster/subscriber/ClusterStatusSubscriber.java      |  8 ++------
 .../compute/subscriber/ComputeNodeStatusSubscriber.java  | 16 +++-------------
 .../subscriber/QualifiedDataSourceStatusSubscriber.java  |  8 ++------
 .../cluster/subscriber/ClusterStatusSubscriberTest.java  | 13 ++++++-------
 .../QualifiedDataSourceStatusSubscriberTest.java         |  5 +----
 8 files changed, 23 insertions(+), 52 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index f96d03fab2a..f3372d843eb 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -95,11 +95,11 @@ public final class RegistryCenter {
     }
     
     private void createSubscribers(final ClusterPersistRepository repository) {
-        new ComputeNodeStatusSubscriber(this, repository);
-        new ClusterStatusSubscriber(repository, eventBusContext);
-        new QualifiedDataSourceStatusSubscriber(repository, eventBusContext);
-        new ClusterProcessSubscriber(repository, eventBusContext);
-        new ShardingSphereSchemaDataRegistrySubscriber(repository, 
eventBusContext);
+        eventBusContext.register(new ComputeNodeStatusSubscriber(this, 
repository));
+        eventBusContext.register(new ClusterStatusSubscriber(repository));
+        eventBusContext.register(new 
QualifiedDataSourceStatusSubscriber(repository));
+        eventBusContext.register(new ClusterProcessSubscriber(repository, 
eventBusContext));
+        eventBusContext.register(new 
ShardingSphereSchemaDataRegistrySubscriber(repository));
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
index 8c738752d9c..c7843a645ed 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java
@@ -18,7 +18,6 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;
 
 import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import 
org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
@@ -31,9 +30,8 @@ public final class ShardingSphereSchemaDataRegistrySubscriber 
implements EventSu
     
     private final ShardingSphereDataPersistService persistService;
     
-    public ShardingSphereSchemaDataRegistrySubscriber(final 
ClusterPersistRepository repository, final EventBusContext eventBusContext) {
+    public ShardingSphereSchemaDataRegistrySubscriber(final 
ClusterPersistRepository repository) {
         persistService = new ShardingSphereDataPersistService(repository);
-        eventBusContext.register(this);
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
index f6554b0b5fc..ca7219b50bb 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
 
 import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
 import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
 import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
@@ -42,20 +43,14 @@ import java.util.stream.Stream;
 /**
  * Cluster process subscriber.
  */
+@RequiredArgsConstructor
 public final class ClusterProcessSubscriber implements ProcessSubscriber, 
EventSubscriber {
     
     private final PersistRepository repository;
     
     private final EventBusContext eventBusContext;
     
-    private final YamlProcessListSwapper swapper;
-    
-    public ClusterProcessSubscriber(final PersistRepository repository, final 
EventBusContext eventBusContext) {
-        this.repository = repository;
-        this.eventBusContext = eventBusContext;
-        swapper = new YamlProcessListSwapper();
-        eventBusContext.register(this);
-    }
+    private final YamlProcessListSwapper swapper = new 
YamlProcessListSwapper();
     
     @Override
     @Subscribe
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java
index 12007181f1c..da35826018e 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;
 
 import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
@@ -27,15 +27,11 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 /**
  * Cluster status subscriber.
  */
+@RequiredArgsConstructor
 public final class ClusterStatusSubscriber implements EventSubscriber {
     
     private final ClusterPersistRepository repository;
     
-    public ClusterStatusSubscriber(final ClusterPersistRepository repository, 
final EventBusContext eventBusContext) {
-        this.repository = repository;
-        eventBusContext.register(this);
-    }
-    
     /**
      * Update cluster status.
      *
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
index 3b24717a67b..c1f26ea4b70 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;
 
 import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.event.node.ComputeNodeStatusChangedEvent;
@@ -25,23 +26,16 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
-import java.util.Collections;
-
 /**
  * Compute node status subscriber.
  */
+@RequiredArgsConstructor
 public final class ComputeNodeStatusSubscriber implements EventSubscriber {
     
     private final RegistryCenter registryCenter;
     
     private final ClusterPersistRepository repository;
     
-    public ComputeNodeStatusSubscriber(final RegistryCenter registryCenter, 
final ClusterPersistRepository repository) {
-        this.registryCenter = registryCenter;
-        this.repository = repository;
-        registryCenter.getEventBusContext().register(this);
-    }
-    
     /**
      * Update compute node status.
      *
@@ -59,10 +53,6 @@ public final class ComputeNodeStatusSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public void update(final LabelsChangedEvent event) {
-        if (event.getLabels().isEmpty()) {
-            
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(),
 Collections.emptyList());
-        } else {
-            
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(),
 event.getLabels());
-        }
+        
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(),
 event.getLabels());
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
index 7893bf4d959..1dd359778a6 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;
 
 import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import 
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -27,15 +27,11 @@ import 
org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
 /**
  * Qualified data source status subscriber.
  */
+@RequiredArgsConstructor
 public final class QualifiedDataSourceStatusSubscriber implements 
EventSubscriber {
     
     private final ClusterPersistRepository repository;
     
-    public QualifiedDataSourceStatusSubscriber(final ClusterPersistRepository 
repository, final EventBusContext eventBusContext) {
-        this.repository = repository;
-        eventBusContext.register(this);
-    }
-    
     /**
      * Delete qualified data source.
      *
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriberTest.java
index 0657f71377c..110e7395af4 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriberTest.java
@@ -17,14 +17,13 @@
 
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;
 
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.junit.jupiter.api.extension.ExtendWith;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
 import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import static org.mockito.Mockito.verify;
@@ -37,7 +36,7 @@ class ClusterStatusSubscriberTest {
     
     @Test
     void assertUpdate() {
-        ClusterStatusSubscriber clusterStatusSubscriber = new 
ClusterStatusSubscriber(repository, new EventBusContext());
+        ClusterStatusSubscriber clusterStatusSubscriber = new 
ClusterStatusSubscriber(repository);
         ClusterStatusChangedEvent event = new 
ClusterStatusChangedEvent(ClusterState.OK);
         clusterStatusSubscriber.update(event);
         verify(repository).persist(ComputeNode.getClusterStatusNodePath(), 
ClusterState.OK.name());
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriberTest.java
index 69a2e31cdb9..c28beba4fab 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriberTest.java
@@ -18,7 +18,6 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;
 
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import 
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
@@ -35,15 +34,13 @@ class QualifiedDataSourceStatusSubscriberTest {
     @Mock
     private ClusterPersistRepository repository;
     
-    private final EventBusContext eventBusContext = new EventBusContext();
-    
     @Test
     void assertDeleteStorageNodeDataSourceDataSourceState() {
         String databaseName = "replica_query_db";
         String groupName = "readwrite_ds";
         String dataSourceName = "replica_ds_0";
         QualifiedDataSourceDeletedEvent event = new 
QualifiedDataSourceDeletedEvent(new QualifiedDataSource(databaseName, 
groupName, dataSourceName));
-        new QualifiedDataSourceStatusSubscriber(repository, 
eventBusContext).delete(event);
+        new QualifiedDataSourceStatusSubscriber(repository).delete(event);
         
verify(repository).delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(new
 QualifiedDataSource(databaseName, groupName, dataSourceName)));
     }
 }

Reply via email to