This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a2de70671ce Add DeliverEventSubscriberFactory SPI (#34041)
a2de70671ce is described below

commit a2de70671ce1b632e932b440f351a1fb4b5b37f0
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 13 17:29:35 2024 +0800

    Add DeliverEventSubscriberFactory SPI (#34041)
    
    * Add DeliverEventSubscriberFactory SPI
    
    * Add DeliverEventSubscriberFactory SPI
    
    * Add DeliverEventSubscriberFactory SPI
---
 ...riteSplittingStaticDataSourceRuleAttribute.java |  2 +-
 .../QualifiedDataSourceDeletedEvent.java           |  2 +-
 .../QualifiedDataSourceDeletedSubscriber.java      | 11 ++++----
 ...ualifiedDataSourceDeletedSubscriberFactory.java | 23 +++++++----------
 ...ode.event.deliver.DeliverEventSubscriberFactory | 18 +++++++++++++
 ...SplittingStaticDataSourceRuleAttributeTest.java |  2 +-
 .../QualifiedDataSourceDeletedSubscriberTest.java  | 13 +++++-----
 .../deliver/DeliverEventSubscriberFactory.java}    | 30 +++++++++++-----------
 .../cluster/ClusterContextManagerBuilder.java      |  9 ++++---
 9 files changed, 62 insertions(+), 48 deletions(-)

diff --git 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java
 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java
index 03234fb9e05..5ecbd89d1bf 100644
--- 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java
+++ 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
 import 
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
 import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
-import 
org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
+import 
org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedEvent;
 import 
org.apache.shardingsphere.readwritesplitting.exception.logic.ReadwriteSplittingDataSourceRuleNotFoundException;
 import 
org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingDataSourceGroupRule;
 
diff --git 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/datasource/qualified/QualifiedDataSourceDeletedEvent.java
 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedEvent.java
similarity index 94%
rename from 
mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/datasource/qualified/QualifiedDataSourceDeletedEvent.java
rename to 
features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedEvent.java
index 4f0497aa541..f21213b004d 100644
--- 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/datasource/qualified/QualifiedDataSourceDeletedEvent.java
+++ 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedEvent.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.event.deliver.datasource.qualified;
+package org.apache.shardingsphere.readwritesplitting.subscriber;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriber.java
 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriber.java
similarity index 76%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriber.java
rename to 
features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriber.java
index 252173f1105..878be851d63 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriber.java
+++ 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriber.java
@@ -15,22 +15,21 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type;
+package org.apache.shardingsphere.readwritesplitting.subscriber;
 
 import com.google.common.eventbus.Subscribe;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import 
org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.metadata.persist.node.QualifiedDataSourceNode;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 /**
- * Deliver data source status subscriber.
+ * Qualified data source deleted subscriber.
  */
 @RequiredArgsConstructor
-public final class DeliverQualifiedDataSourceSubscriber implements 
EventSubscriber {
+public final class QualifiedDataSourceDeletedSubscriber implements 
EventSubscriber {
     
-    private final ClusterPersistRepository repository;
+    private final PersistRepository repository;
     
     /**
      * Delete qualified data source.
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberFactory.java
similarity index 54%
copy from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
copy to 
features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberFactory.java
index 3f874da0067..942a64d45aa 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
+++ 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberFactory.java
@@ -15,25 +15,20 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber;
+package org.apache.shardingsphere.readwritesplitting.subscriber;
 
-import lombok.Getter;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import 
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type.DeliverQualifiedDataSourceSubscriber;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-
-import java.util.Collection;
-import java.util.Collections;
+import 
org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 /**
- * Cluster deliver event subscriber registry.
+ * Qualified data source deleted subscriber factory.
  */
-@Getter
-public final class ClusterDeliverEventSubscriberRegistry {
-    
-    private final Collection<EventSubscriber> subscribers;
+public final class QualifiedDataSourceDeletedSubscriberFactory implements 
DeliverEventSubscriberFactory {
     
-    public ClusterDeliverEventSubscriberRegistry(final 
ClusterPersistRepository repository) {
-        subscribers = Collections.singleton(new 
DeliverQualifiedDataSourceSubscriber(repository));
+    @Override
+    public EventSubscriber create(final PersistRepository repository, final 
EventBusContext eventBusContext) {
+        return new QualifiedDataSourceDeletedSubscriber(repository);
     }
 }
diff --git 
a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
 
b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
new file mode 100644
index 00000000000..1578fc246e6
--- /dev/null
+++ 
b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedSubscriberFactory
diff --git 
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java
 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java
index f56387f3ab4..cdfaf039bce 100644
--- 
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java
+++ 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.readwritesplitting.rule.attribute;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
 import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
-import 
org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
+import 
org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedEvent;
 import 
org.apache.shardingsphere.readwritesplitting.exception.logic.ReadwriteSplittingDataSourceRuleNotFoundException;
 import 
org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingDataSourceGroupRule;
 import org.junit.jupiter.api.Test;
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriberTest.java
 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberTest.java
similarity index 75%
rename from 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriberTest.java
rename to 
features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberTest.java
index f9d25ff1cf5..1cdc92dde2f 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriberTest.java
+++ 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberTest.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type;
+package org.apache.shardingsphere.readwritesplitting.subscriber;
 
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
-import 
org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -29,16 +28,16 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import static org.mockito.Mockito.verify;
 
 @ExtendWith(MockitoExtension.class)
-class DeliverQualifiedDataSourceSubscriberTest {
+class QualifiedDataSourceDeletedSubscriberTest {
     
-    private DeliverQualifiedDataSourceSubscriber subscriber;
+    private QualifiedDataSourceDeletedSubscriber subscriber;
     
     @Mock
-    private ClusterPersistRepository repository;
+    private PersistRepository repository;
     
     @BeforeEach
     void setUp() {
-        subscriber = new DeliverQualifiedDataSourceSubscriber(repository);
+        subscriber = new QualifiedDataSourceDeletedSubscriber(repository);
     }
     
     @Test
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
similarity index 54%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
rename to 
mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
index 3f874da0067..a43902000c2 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
+++ 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
@@ -15,25 +15,25 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber;
+package org.apache.shardingsphere.mode.event.deliver;
 
-import lombok.Getter;
+import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import 
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type.DeliverQualifiedDataSourceSubscriber;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-
-import java.util.Collection;
-import java.util.Collections;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 /**
- * Cluster deliver event subscriber registry.
+ * Deliver event subscriber factory.
  */
-@Getter
-public final class ClusterDeliverEventSubscriberRegistry {
-    
-    private final Collection<EventSubscriber> subscribers;
+@SingletonSPI
+public interface DeliverEventSubscriberFactory {
     
-    public ClusterDeliverEventSubscriberRegistry(final 
ClusterPersistRepository repository) {
-        subscribers = Collections.singleton(new 
DeliverQualifiedDataSourceSubscriber(repository));
-    }
+    /**
+     * Create deliver event subscriber.
+     *
+     * @param repository cluster persist repository
+     * @param eventBusContext event bus context
+     * @return created event subscriber
+     */
+    EventSubscriber create(PersistRepository repository, EventBusContext 
eventBusContext);
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index a8c574fd3a5..c942b31c0fc 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import 
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
 import org.apache.shardingsphere.infra.lock.LockContext;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
@@ -31,10 +32,10 @@ import 
org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import 
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.ClusterDeliverEventSubscriberRegistry;
+import 
org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.ClusterDispatchEventSubscriberRegistry;
 import 
org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
-import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry;
 import 
org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.manager.cluster.workerid.ClusterWorkerIdGenerator;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -44,6 +45,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.stream.Collectors;
 
 /**
  * Cluster context manager builder.
@@ -78,7 +80,8 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
         new DataChangedEventListenerRegistry(contextManager, 
getDatabaseNames(param, 
contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
         EventSubscriberRegistry eventSubscriberRegistry = new 
EventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
-        eventSubscriberRegistry.register(new 
ClusterDeliverEventSubscriberRegistry(repository).getSubscribers());
+        
eventSubscriberRegistry.register(ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriberFactory.class).stream()
+                .map(each -> each.create(repository, 
contextManager.getComputeNodeInstanceContext().getEventBusContext())).collect(Collectors.toList()));
         eventSubscriberRegistry.register(new 
ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers());
     }
     

Reply via email to