This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ed0db5385e3 Add DeliverEventSubscriber (#34067)
ed0db5385e3 is described below
commit ed0db5385e3848699990a8252cddfb0bb462f04c
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 15 20:57:14 2024 +0800
Add DeliverEventSubscriber (#34067)
* Add DeliverEventSubscriber
* Add DeliverEventSubscriber
* Add DeliverEventSubscriber
---
...ittingQualifiedDataSourceDeletedSubscriber.java | 10 +++----
...ualifiedDataSourceDeletedSubscriberFactory.java | 34 ----------------------
...here.mode.event.deliver.DeliverEventSubscriber} | 2 +-
...ngQualifiedDataSourceDeletedSubscriberTest.java | 3 +-
...berFactory.java => DeliverEventSubscriber.java} | 11 +++----
.../cluster/ClusterContextManagerBuilder.java | 19 ++++++++----
6 files changed, 26 insertions(+), 53 deletions(-)
diff --git
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java
index 70ffc7bc4a8..c24ee7f195e 100644
---
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java
+++
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java
@@ -18,19 +18,19 @@
package org.apache.shardingsphere.readwritesplitting.cluster;
import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import lombok.Setter;
import org.apache.shardingsphere.metadata.persist.node.QualifiedDataSourceNode;
+import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber;
import
org.apache.shardingsphere.mode.event.deliver.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.spi.PersistRepository;
/**
* Readwrite-splitting qualified data source deleted subscriber.
*/
-@RequiredArgsConstructor
-public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber
implements EventSubscriber {
+@Setter
+public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber
implements DeliverEventSubscriber {
- private final PersistRepository repository;
+ private PersistRepository repository;
/**
* Delete qualified data source.
diff --git
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory.java
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory.java
deleted file mode 100644
index f2de2fff287..00000000000
---
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.readwritesplitting.cluster;
-
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import
org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
-
-/**
- * Readwrite-splitting qualified data source deleted subscriber factory.
- */
-public final class
ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory implements
DeliverEventSubscriberFactory {
-
- @Override
- public EventSubscriber create(final PersistRepository repository, final
EventBusContext eventBusContext) {
- return new
ReadwriteSplittingQualifiedDataSourceDeletedSubscriber(repository);
- }
-}
diff --git
a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber
similarity index 94%
rename from
features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
rename to
features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber
index 3b42a3fe23e..01fd8f0abad 100644
---
a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
+++
b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.readwritesplitting.cluster.ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory
+org.apache.shardingsphere.readwritesplitting.cluster.ReadwriteSplittingQualifiedDataSourceDeletedSubscriber
diff --git
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java
index f3994bd0aa4..9f25aea262f 100644
---
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java
+++
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java
@@ -38,7 +38,8 @@ class
ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest {
@BeforeEach
void setUp() {
- subscriber = new
ReadwriteSplittingQualifiedDataSourceDeletedSubscriber(repository);
+ subscriber = new
ReadwriteSplittingQualifiedDataSourceDeletedSubscriber();
+ subscriber.setRepository(repository);
}
@Test
diff --git
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriber.java
similarity index 74%
rename from
mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
rename to
mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriber.java
index a43902000c2..ef278879965 100644
---
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
+++
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriber.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.mode.event.deliver;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.spi.PersistRepository;
@@ -26,14 +25,12 @@ import org.apache.shardingsphere.mode.spi.PersistRepository;
* Deliver event subscriber factory.
*/
@SingletonSPI
-public interface DeliverEventSubscriberFactory {
+public interface DeliverEventSubscriber extends EventSubscriber {
/**
- * Create deliver event subscriber.
+ * Set persist repository.
*
- * @param repository cluster persist repository
- * @param eventBusContext event bus context
- * @return created event subscriber
+ * @param repository persist repository
*/
- EventSubscriber create(PersistRepository repository, EventBusContext
eventBusContext);
+ void setRepository(PersistRepository repository);
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index ebe7b89f7a9..647a0692093 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -26,13 +26,14 @@ import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
-import
org.apache.shardingsphere.mode.manager.cluster.event.ClusterEventSubscriberRegistry;
+import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import
org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
+import
org.apache.shardingsphere.mode.manager.cluster.event.ClusterEventSubscriberRegistry;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.ClusterDispatchEventSubscriberRegistry;
import
org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
@@ -45,7 +46,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
import java.sql.SQLException;
import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.LinkedList;
/**
* Cluster context manager builder.
@@ -80,8 +81,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
new DataChangedEventListenerRegistry(contextManager,
getDatabaseNames(param,
contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
ClusterEventSubscriberRegistry eventSubscriberRegistry = new
ClusterEventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
-
eventSubscriberRegistry.register(ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriberFactory.class).stream()
- .map(each -> each.create(repository,
contextManager.getComputeNodeInstanceContext().getEventBusContext())).collect(Collectors.toList()));
+
eventSubscriberRegistry.register(createDeliverEventSubscribers(repository));
eventSubscriberRegistry.register(new
ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers());
}
@@ -91,6 +91,15 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
:
metaDataPersistService.getDatabaseMetaDataFacade().getDatabase().loadAllDatabaseNames();
}
+ private Collection<EventSubscriber> createDeliverEventSubscribers(final
ClusterPersistRepository repository) {
+ Collection<EventSubscriber> result = new LinkedList<>();
+ for (DeliverEventSubscriber each :
ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriber.class)) {
+ each.setRepository(repository);
+ result.add(each);
+ }
+ return result;
+ }
+
@Override
public String getType() {
return "Cluster";