This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c7648695a52 Add InternalEventSubscriberRegistry (#31628)
c7648695a52 is described below
commit c7648695a521dd16cfdf7601fe137bb11679c557
Author: Haoran Meng <[email protected]>
AuthorDate: Tue Jun 11 10:01:02 2024 +0800
Add InternalEventSubscriberRegistry (#31628)
---
.../cluster/ClusterContextManagerBuilder.java | 11 +++------
... => InternalQualifiedDataSourceSubscriber.java} | 2 +-
.../InternalEventSubscriberRegistry.java} | 27 +++++++---------------
.../QualifiedDataSourceStateSubscriberTest.java | 4 ++--
4 files changed, 14 insertions(+), 30 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 67ffdc0701b..f536bc4b84e 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -31,13 +31,13 @@ import
org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataWatchListenerManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.QualifiedDataSourceStatusSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.InternalEventSubscriberRegistry;
import
org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
@@ -64,7 +64,6 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
MetaDataContexts metaDataContexts =
MetaDataContextsFactory.create(metaDataPersistService, param,
computeNodeInstanceContext,
new
QualifiedDataSourceStatePersistService(repository).loadStates());
ContextManager result = new ContextManager(metaDataContexts,
computeNodeInstanceContext, repository);
- createSubscribers(eventBusContext, repository);
registerOnline(eventBusContext, computeNodeInstanceContext,
repository, param, result);
return result;
}
@@ -81,17 +80,13 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
new GlobalLockContext(new
GlobalLockPersistService(repository)), eventBusContext);
}
- // TODO remove the method, only keep ZooKeeper's events, remove all
decouple events
- private void createSubscribers(final EventBusContext eventBusContext,
final ClusterPersistRepository repository) {
- eventBusContext.register(new
QualifiedDataSourceStatusSubscriber(repository));
- }
-
private void registerOnline(final EventBusContext eventBusContext, final
ComputeNodeInstanceContext computeNodeInstanceContext,
final ClusterPersistRepository repository,
final ContextManagerBuilderParameter param, final ContextManager
contextManager) {
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
new GovernanceWatcherFactory(repository,
eventBusContext).watchListeners();
watchDatabaseMetaDataListener(param,
contextManager.getPersistServiceFacade().getMetaDataPersistService(),
eventBusContext);
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
+ new InternalEventSubscriberRegistry(contextManager,
repository).register();
new ClusterEventSubscriberRegistry(contextManager,
repository).register();
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/InternalQualifiedDataSourceSubscriber.java
similarity index 95%
copy from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
copy to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/InternalQualifiedDataSourceSubscriber.java
index 21ff04409c5..50450cdd9b9 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/InternalQualifiedDataSourceSubscriber.java
@@ -28,7 +28,7 @@ import
org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
* Qualified data source status subscriber.
*/
@RequiredArgsConstructor
-public final class QualifiedDataSourceStatusSubscriber implements
EventSubscriber {
+public final class InternalQualifiedDataSourceSubscriber implements
EventSubscriber {
private final ClusterPersistRepository repository;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/InternalEventSubscriberRegistry.java
similarity index 52%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/InternalEventSubscriberRegistry.java
index 21ff04409c5..eac9e3b9da6 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/InternalEventSubscriberRegistry.java
@@ -15,30 +15,19 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
-import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.InternalQualifiedDataSourceSubscriber;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
+import org.apache.shardingsphere.mode.subsciber.EventSubscriberRegistry;
/**
- * Qualified data source status subscriber.
+ * Internal event subscriber registry.
*/
-@RequiredArgsConstructor
-public final class QualifiedDataSourceStatusSubscriber implements
EventSubscriber {
+public class InternalEventSubscriberRegistry extends EventSubscriberRegistry {
- private final ClusterPersistRepository repository;
-
- /**
- * Delete qualified data source.
- *
- * @param event qualified data source deleted event
- */
- @Subscribe
- public void delete(final QualifiedDataSourceDeletedEvent event) {
-
repository.delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(event.getQualifiedDataSource()));
+ public InternalEventSubscriberRegistry(final ContextManager
contextManager, final ClusterPersistRepository repository) {
+ super(contextManager, new
InternalQualifiedDataSourceSubscriber(repository));
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
index 4a680d40f28..c0d6fc37a13 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.QualifiedDataSourceStatusSubscriber;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.InternalQualifiedDataSourceSubscriber;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
import org.junit.jupiter.api.Test;
@@ -41,7 +41,7 @@ class QualifiedDataSourceStateSubscriberTest {
String groupName = "readwrite_ds";
String dataSourceName = "replica_ds_0";
QualifiedDataSourceDeletedEvent event = new
QualifiedDataSourceDeletedEvent(new QualifiedDataSource(databaseName,
groupName, dataSourceName));
- new QualifiedDataSourceStatusSubscriber(repository).delete(event);
+ new InternalQualifiedDataSourceSubscriber(repository).delete(event);
verify(repository).delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(new
QualifiedDataSource(databaseName, groupName, dataSourceName)));
}
}