This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c7648695a52 Add InternalEventSubscriberRegistry (#31628)
c7648695a52 is described below

commit c7648695a521dd16cfdf7601fe137bb11679c557
Author: Haoran Meng <[email protected]>
AuthorDate: Tue Jun 11 10:01:02 2024 +0800

    Add InternalEventSubscriberRegistry (#31628)
---
 .../cluster/ClusterContextManagerBuilder.java      | 11 +++------
 ... => InternalQualifiedDataSourceSubscriber.java} |  2 +-
 .../InternalEventSubscriberRegistry.java}          | 27 +++++++---------------
 .../QualifiedDataSourceStateSubscriberTest.java    |  4 ++--
 4 files changed, 14 insertions(+), 30 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 67ffdc0701b..f536bc4b84e 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -31,13 +31,13 @@ import 
org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataWatchListenerManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.QualifiedDataSourceStatusSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.InternalEventSubscriberRegistry;
 import 
org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
@@ -64,7 +64,6 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         MetaDataContexts metaDataContexts = 
MetaDataContextsFactory.create(metaDataPersistService, param, 
computeNodeInstanceContext,
                 new 
QualifiedDataSourceStatePersistService(repository).loadStates());
         ContextManager result = new ContextManager(metaDataContexts, 
computeNodeInstanceContext, repository);
-        createSubscribers(eventBusContext, repository);
         registerOnline(eventBusContext, computeNodeInstanceContext, 
repository, param, result);
         return result;
     }
@@ -81,17 +80,13 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
                 new GlobalLockContext(new 
GlobalLockPersistService(repository)), eventBusContext);
     }
     
-    // TODO remove the method, only keep ZooKeeper's events, remove all 
decouple events
-    private void createSubscribers(final EventBusContext eventBusContext, 
final ClusterPersistRepository repository) {
-        eventBusContext.register(new 
QualifiedDataSourceStatusSubscriber(repository));
-    }
-    
     private void registerOnline(final EventBusContext eventBusContext, final 
ComputeNodeInstanceContext computeNodeInstanceContext,
                                 final ClusterPersistRepository repository, 
final ContextManagerBuilderParameter param, final ContextManager 
contextManager) {
         
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
         new GovernanceWatcherFactory(repository, 
eventBusContext).watchListeners();
         watchDatabaseMetaDataListener(param, 
contextManager.getPersistServiceFacade().getMetaDataPersistService(), 
eventBusContext);
         
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
+        new InternalEventSubscriberRegistry(contextManager, 
repository).register();
         new ClusterEventSubscriberRegistry(contextManager, 
repository).register();
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/InternalQualifiedDataSourceSubscriber.java
similarity index 95%
copy from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
copy to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/InternalQualifiedDataSourceSubscriber.java
index 21ff04409c5..50450cdd9b9 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/InternalQualifiedDataSourceSubscriber.java
@@ -28,7 +28,7 @@ import 
org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
  * Qualified data source status subscriber.
  */
 @RequiredArgsConstructor
-public final class QualifiedDataSourceStatusSubscriber implements 
EventSubscriber {
+public final class InternalQualifiedDataSourceSubscriber implements 
EventSubscriber {
     
     private final ClusterPersistRepository repository;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/InternalEventSubscriberRegistry.java
similarity index 52%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/InternalEventSubscriberRegistry.java
index 21ff04409c5..eac9e3b9da6 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/InternalEventSubscriberRegistry.java
@@ -15,30 +15,19 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 
-import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import 
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.InternalQualifiedDataSourceSubscriber;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
+import org.apache.shardingsphere.mode.subsciber.EventSubscriberRegistry;
 
 /**
- * Qualified data source status subscriber.
+ * Internal event subscriber registry.
  */
-@RequiredArgsConstructor
-public final class QualifiedDataSourceStatusSubscriber implements 
EventSubscriber {
+public class InternalEventSubscriberRegistry extends EventSubscriberRegistry {
     
-    private final ClusterPersistRepository repository;
-    
-    /**
-     * Delete qualified data source.
-     *
-     * @param event qualified data source deleted event
-     */
-    @Subscribe
-    public void delete(final QualifiedDataSourceDeletedEvent event) {
-        
repository.delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(event.getQualifiedDataSource()));
+    public InternalEventSubscriberRegistry(final ContextManager 
contextManager, final ClusterPersistRepository repository) {
+        super(contextManager, new 
InternalQualifiedDataSourceSubscriber(repository));
     }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
index 4a680d40f28..c0d6fc37a13 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
 
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
 import 
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.QualifiedDataSourceStatusSubscriber;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.InternalQualifiedDataSourceSubscriber;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
 import org.junit.jupiter.api.Test;
@@ -41,7 +41,7 @@ class QualifiedDataSourceStateSubscriberTest {
         String groupName = "readwrite_ds";
         String dataSourceName = "replica_ds_0";
         QualifiedDataSourceDeletedEvent event = new 
QualifiedDataSourceDeletedEvent(new QualifiedDataSource(databaseName, 
groupName, dataSourceName));
-        new QualifiedDataSourceStatusSubscriber(repository).delete(event);
+        new InternalQualifiedDataSourceSubscriber(repository).delete(event);
         
verify(repository).delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(new
 QualifiedDataSource(databaseName, groupName, dataSourceName)));
     }
 }

Reply via email to