This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a2de70671ce Add DeliverEventSubscriberFactory SPI (#34041)
a2de70671ce is described below
commit a2de70671ce1b632e932b440f351a1fb4b5b37f0
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 13 17:29:35 2024 +0800
Add DeliverEventSubscriberFactory SPI (#34041)
* Add DeliverEventSubscriberFactory SPI
* Add DeliverEventSubscriberFactory SPI
* Add DeliverEventSubscriberFactory SPI
---
...riteSplittingStaticDataSourceRuleAttribute.java | 2 +-
.../QualifiedDataSourceDeletedEvent.java | 2 +-
.../QualifiedDataSourceDeletedSubscriber.java | 11 ++++----
...ualifiedDataSourceDeletedSubscriberFactory.java | 23 +++++++----------
...ode.event.deliver.DeliverEventSubscriberFactory | 18 +++++++++++++
...SplittingStaticDataSourceRuleAttributeTest.java | 2 +-
.../QualifiedDataSourceDeletedSubscriberTest.java | 13 +++++-----
.../deliver/DeliverEventSubscriberFactory.java} | 30 +++++++++++-----------
.../cluster/ClusterContextManagerBuilder.java | 9 ++++---
9 files changed, 62 insertions(+), 48 deletions(-)
diff --git
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java
index 03234fb9e05..5ecbd89d1bf 100644
---
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java
+++
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttribute.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
-import
org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
+import
org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedEvent;
import
org.apache.shardingsphere.readwritesplitting.exception.logic.ReadwriteSplittingDataSourceRuleNotFoundException;
import
org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingDataSourceGroupRule;
diff --git
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/datasource/qualified/QualifiedDataSourceDeletedEvent.java
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedEvent.java
similarity index 94%
rename from
mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/datasource/qualified/QualifiedDataSourceDeletedEvent.java
rename to
features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedEvent.java
index 4f0497aa541..f21213b004d 100644
---
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/datasource/qualified/QualifiedDataSourceDeletedEvent.java
+++
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedEvent.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.event.deliver.datasource.qualified;
+package org.apache.shardingsphere.readwritesplitting.subscriber;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriber.java
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriber.java
similarity index 76%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriber.java
rename to
features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriber.java
index 252173f1105..878be851d63 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriber.java
+++
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriber.java
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type;
+package org.apache.shardingsphere.readwritesplitting.subscriber;
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import
org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.metadata.persist.node.QualifiedDataSourceNode;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
/**
- * Deliver data source status subscriber.
+ * Qualified data source deleted subscriber.
*/
@RequiredArgsConstructor
-public final class DeliverQualifiedDataSourceSubscriber implements
EventSubscriber {
+public final class QualifiedDataSourceDeletedSubscriber implements
EventSubscriber {
- private final ClusterPersistRepository repository;
+ private final PersistRepository repository;
/**
* Delete qualified data source.
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberFactory.java
similarity index 54%
copy from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
copy to
features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberFactory.java
index 3f874da0067..942a64d45aa 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
+++
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberFactory.java
@@ -15,25 +15,20 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber;
+package org.apache.shardingsphere.readwritesplitting.subscriber;
-import lombok.Getter;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type.DeliverQualifiedDataSourceSubscriber;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-
-import java.util.Collection;
-import java.util.Collections;
+import
org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
/**
- * Cluster deliver event subscriber registry.
+ * Qualified data source deleted subscriber factory.
*/
-@Getter
-public final class ClusterDeliverEventSubscriberRegistry {
-
- private final Collection<EventSubscriber> subscribers;
+public final class QualifiedDataSourceDeletedSubscriberFactory implements
DeliverEventSubscriberFactory {
- public ClusterDeliverEventSubscriberRegistry(final
ClusterPersistRepository repository) {
- subscribers = Collections.singleton(new
DeliverQualifiedDataSourceSubscriber(repository));
+ @Override
+ public EventSubscriber create(final PersistRepository repository, final
EventBusContext eventBusContext) {
+ return new QualifiedDataSourceDeletedSubscriber(repository);
}
}
diff --git
a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
new file mode 100644
index 00000000000..1578fc246e6
--- /dev/null
+++
b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedSubscriberFactory
diff --git
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java
index f56387f3ab4..cdfaf039bce 100644
---
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java
+++
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/rule/attribute/ReadwriteSplittingStaticDataSourceRuleAttributeTest.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.readwritesplitting.rule.attribute;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
-import
org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
+import
org.apache.shardingsphere.readwritesplitting.subscriber.QualifiedDataSourceDeletedEvent;
import
org.apache.shardingsphere.readwritesplitting.exception.logic.ReadwriteSplittingDataSourceRuleNotFoundException;
import
org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingDataSourceGroupRule;
import org.junit.jupiter.api.Test;
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriberTest.java
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberTest.java
similarity index 75%
rename from
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriberTest.java
rename to
features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberTest.java
index f9d25ff1cf5..1cdc92dde2f 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/type/DeliverQualifiedDataSourceSubscriberTest.java
+++
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/subscriber/QualifiedDataSourceDeletedSubscriberTest.java
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type;
+package org.apache.shardingsphere.readwritesplitting.subscriber;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
-import
org.apache.shardingsphere.mode.event.deliver.datasource.qualified.QualifiedDataSourceDeletedEvent;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -29,16 +28,16 @@ import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
-class DeliverQualifiedDataSourceSubscriberTest {
+class QualifiedDataSourceDeletedSubscriberTest {
- private DeliverQualifiedDataSourceSubscriber subscriber;
+ private QualifiedDataSourceDeletedSubscriber subscriber;
@Mock
- private ClusterPersistRepository repository;
+ private PersistRepository repository;
@BeforeEach
void setUp() {
- subscriber = new DeliverQualifiedDataSourceSubscriber(repository);
+ subscriber = new QualifiedDataSourceDeletedSubscriber(repository);
}
@Test
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
similarity index 54%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
rename to
mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
index 3f874da0067..a43902000c2 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/deliver/subscriber/ClusterDeliverEventSubscriberRegistry.java
+++
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
@@ -15,25 +15,25 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber;
+package org.apache.shardingsphere.mode.event.deliver;
-import lombok.Getter;
+import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.type.DeliverQualifiedDataSourceSubscriber;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-
-import java.util.Collection;
-import java.util.Collections;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
/**
- * Cluster deliver event subscriber registry.
+ * Deliver event subscriber factory.
*/
-@Getter
-public final class ClusterDeliverEventSubscriberRegistry {
-
- private final Collection<EventSubscriber> subscribers;
+@SingletonSPI
+public interface DeliverEventSubscriberFactory {
- public ClusterDeliverEventSubscriberRegistry(final
ClusterPersistRepository repository) {
- subscribers = Collections.singleton(new
DeliverQualifiedDataSourceSubscriber(repository));
- }
+ /**
+ * Create deliver event subscriber.
+ *
+ * @param repository cluster persist repository
+ * @param eventBusContext event bus context
+ * @return created event subscriber
+ */
+ EventSubscriber create(PersistRepository repository, EventBusContext
eventBusContext);
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index a8c574fd3a5..c942b31c0fc 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.lock.LockContext;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
@@ -31,10 +32,10 @@ import
org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import
org.apache.shardingsphere.mode.manager.cluster.event.deliver.subscriber.ClusterDeliverEventSubscriberRegistry;
+import
org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.ClusterDispatchEventSubscriberRegistry;
import
org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
-import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry;
import
org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.workerid.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -44,6 +45,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
import java.sql.SQLException;
import java.util.Collection;
+import java.util.stream.Collectors;
/**
* Cluster context manager builder.
@@ -78,7 +80,8 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
new DataChangedEventListenerRegistry(contextManager,
getDatabaseNames(param,
contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
EventSubscriberRegistry eventSubscriberRegistry = new
EventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
- eventSubscriberRegistry.register(new
ClusterDeliverEventSubscriberRegistry(repository).getSubscribers());
+
eventSubscriberRegistry.register(ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriberFactory.class).stream()
+ .map(each -> each.create(repository,
contextManager.getComputeNodeInstanceContext().getEventBusContext())).collect(Collectors.toList()));
eventSubscriberRegistry.register(new
ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers());
}