This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ed0db5385e3 Add DeliverEventSubscriber (#34067)
ed0db5385e3 is described below

commit ed0db5385e3848699990a8252cddfb0bb462f04c
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 15 20:57:14 2024 +0800

    Add DeliverEventSubscriber (#34067)
    
    * Add DeliverEventSubscriber
    
    * Add DeliverEventSubscriber
    
    * Add DeliverEventSubscriber
---
 ...ittingQualifiedDataSourceDeletedSubscriber.java | 10 +++----
 ...ualifiedDataSourceDeletedSubscriberFactory.java | 34 ----------------------
 ...here.mode.event.deliver.DeliverEventSubscriber} |  2 +-
 ...ngQualifiedDataSourceDeletedSubscriberTest.java |  3 +-
 ...berFactory.java => DeliverEventSubscriber.java} | 11 +++----
 .../cluster/ClusterContextManagerBuilder.java      | 19 ++++++++----
 6 files changed, 26 insertions(+), 53 deletions(-)

diff --git 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java
 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java
index 70ffc7bc4a8..c24ee7f195e 100644
--- 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java
+++ 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriber.java
@@ -18,19 +18,19 @@
 package org.apache.shardingsphere.readwritesplitting.cluster;
 
 import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import lombok.Setter;
 import org.apache.shardingsphere.metadata.persist.node.QualifiedDataSourceNode;
+import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber;
 import 
org.apache.shardingsphere.mode.event.deliver.QualifiedDataSourceDeletedEvent;
 import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 /**
  * Readwrite-splitting qualified data source deleted subscriber.
  */
-@RequiredArgsConstructor
-public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber 
implements EventSubscriber {
+@Setter
+public final class ReadwriteSplittingQualifiedDataSourceDeletedSubscriber 
implements DeliverEventSubscriber {
     
-    private final PersistRepository repository;
+    private PersistRepository repository;
     
     /**
      * Delete qualified data source.
diff --git 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory.java
 
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory.java
deleted file mode 100644
index f2de2fff287..00000000000
--- 
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.readwritesplitting.cluster;
-
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import 
org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
-import org.apache.shardingsphere.mode.spi.PersistRepository;
-
-/**
- * Readwrite-splitting qualified data source deleted subscriber factory.
- */
-public final class 
ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory implements 
DeliverEventSubscriberFactory {
-    
-    @Override
-    public EventSubscriber create(final PersistRepository repository, final 
EventBusContext eventBusContext) {
-        return new 
ReadwriteSplittingQualifiedDataSourceDeletedSubscriber(repository);
-    }
-}
diff --git 
a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
 
b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber
similarity index 94%
rename from 
features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
rename to 
features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber
index 3b42a3fe23e..01fd8f0abad 100644
--- 
a/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory
+++ 
b/features/readwrite-splitting/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.readwritesplitting.cluster.ReadwriteSplittingQualifiedDataSourceDeletedSubscriberFactory
+org.apache.shardingsphere.readwritesplitting.cluster.ReadwriteSplittingQualifiedDataSourceDeletedSubscriber
diff --git 
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java
 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java
index f3994bd0aa4..9f25aea262f 100644
--- 
a/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java
+++ 
b/features/readwrite-splitting/core/src/test/java/org/apache/shardingsphere/readwritesplitting/cluster/ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest.java
@@ -38,7 +38,8 @@ class 
ReadwriteSplittingQualifiedDataSourceDeletedSubscriberTest {
     
     @BeforeEach
     void setUp() {
-        subscriber = new 
ReadwriteSplittingQualifiedDataSourceDeletedSubscriber(repository);
+        subscriber = new 
ReadwriteSplittingQualifiedDataSourceDeletedSubscriber();
+        subscriber.setRepository(repository);
     }
     
     @Test
diff --git 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriber.java
similarity index 74%
rename from 
mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
rename to 
mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriber.java
index a43902000c2..ef278879965 100644
--- 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriberFactory.java
+++ 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/deliver/DeliverEventSubscriber.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.mode.event.deliver;
 
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.mode.spi.PersistRepository;
 
@@ -26,14 +25,12 @@ import org.apache.shardingsphere.mode.spi.PersistRepository;
  * Deliver event subscriber factory.
  */
 @SingletonSPI
-public interface DeliverEventSubscriberFactory {
+public interface DeliverEventSubscriber extends EventSubscriber {
     
     /**
-     * Create deliver event subscriber.
+     * Set persist repository.
      *
-     * @param repository cluster persist repository
-     * @param eventBusContext event bus context
-     * @return created event subscriber
+     * @param repository persist repository
      */
-    EventSubscriber create(PersistRepository repository, EventBusContext 
eventBusContext);
+    void setRepository(PersistRepository repository);
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index ebe7b89f7a9..647a0692093 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -26,13 +26,14 @@ import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
-import 
org.apache.shardingsphere.mode.manager.cluster.event.ClusterEventSubscriberRegistry;
+import org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriber;
 import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import 
org.apache.shardingsphere.mode.event.deliver.DeliverEventSubscriberFactory;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.ClusterEventSubscriberRegistry;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.DataChangedEventListenerRegistry;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.ClusterDispatchEventSubscriberRegistry;
 import 
org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
@@ -45,7 +46,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.LinkedList;
 
 /**
  * Cluster context manager builder.
@@ -80,8 +81,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         
contextManager.getComputeNodeInstanceContext().getAllClusterInstances().addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
         new DataChangedEventListenerRegistry(contextManager, 
getDatabaseNames(param, 
contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
         ClusterEventSubscriberRegistry eventSubscriberRegistry = new 
ClusterEventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
-        
eventSubscriberRegistry.register(ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriberFactory.class).stream()
-                .map(each -> each.create(repository, 
contextManager.getComputeNodeInstanceContext().getEventBusContext())).collect(Collectors.toList()));
+        
eventSubscriberRegistry.register(createDeliverEventSubscribers(repository));
         eventSubscriberRegistry.register(new 
ClusterDispatchEventSubscriberRegistry(contextManager).getSubscribers());
     }
     
@@ -91,6 +91,15 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
                 : 
metaDataPersistService.getDatabaseMetaDataFacade().getDatabase().loadAllDatabaseNames();
     }
     
+    private Collection<EventSubscriber> createDeliverEventSubscribers(final 
ClusterPersistRepository repository) {
+        Collection<EventSubscriber> result = new LinkedList<>();
+        for (DeliverEventSubscriber each : 
ShardingSphereServiceLoader.getServiceInstances(DeliverEventSubscriber.class)) {
+            each.setRepository(repository);
+            result.add(each);
+        }
+        return result;
+    }
+    
     @Override
     public String getType() {
         return "Cluster";

Reply via email to