This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 257fcd9e54d Add DispatchEventSubscriber (#34069)
257fcd9e54d is described below

commit 257fcd9e54d81d2506e518723771847ded81c059
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 15 23:53:13 2024 +0800

    Add DispatchEventSubscriber (#34069)
---
 ...ubscriber.java => DispatchEventSubscriber.java} | 19 +++----------
 .../subscriber/type/CacheEvictedSubscriber.java    |  6 ++---
 .../type/ComputeNodeStateSubscriber.java           | 27 +++++++++++--------
 .../type/DatabaseDataChangedSubscriber.java        | 31 ++++++++++++----------
 .../GlobalRuleConfigurationEventSubscriber.java    |  4 +--
 .../type/ListenerAssistedSubscriber.java           |  4 +--
 .../subscriber/type/MetaDataChangedSubscriber.java |  4 +--
 .../type/ProcessListChangedSubscriber.java         | 26 +++++++++++-------
 .../subscriber/type/PropertiesEventSubscriber.java |  4 +--
 .../type/QualifiedDataSourceSubscriber.java        |  6 ++---
 .../subscriber/type/RuleItemChangedSubscriber.java | 15 +++++++----
 .../subscriber/type/StateChangedSubscriber.java    | 10 +++----
 .../type/StorageUnitEventSubscriber.java           |  4 +--
 13 files changed, 82 insertions(+), 78 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java
similarity index 63%
copy from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
copy to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java
index de796a986f2..2a915c5093d 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java
@@ -15,25 +15,12 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
+package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber;
 
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
-import 
org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 
 /**
- * Cache evicted subscriber.
+ * Dispatch event subscriber.
  */
-public final class CacheEvictedSubscriber implements EventSubscriber {
-    
-    /**
-     * Callback of any {@link DispatchEvent}.
-     *
-     * @param ignored unused
-     */
-    @Subscribe
-    public void cleanCache(final DispatchEvent ignored) {
-        OrderedServicesCache.clearCache();
-    }
+public interface DispatchEventSubscriber extends EventSubscriber {
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
index de796a986f2..ef02905d65d 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
@@ -18,14 +18,14 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
 
 import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
 import 
org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
 
 /**
  * Cache evicted subscriber.
  */
-public final class CacheEvictedSubscriber implements EventSubscriber {
+public final class CacheEvictedSubscriber implements DispatchEventSubscriber {
     
     /**
      * Callback of any {@link DispatchEvent}.
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
index 6eb8b274a64..cf282f34c58 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
@@ -18,24 +18,30 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
 
 import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.ComputeNodeInstanceStateChangedEvent;
 import org.apache.shardingsphere.mode.event.dispatch.state.compute.LabelsEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.WorkerIdEvent;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOfflineEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOnlineEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
 
 /**
  * Compute node state subscriber.
  */
-@RequiredArgsConstructor
-public final class ComputeNodeStateSubscriber implements EventSubscriber {
+public final class ComputeNodeStateSubscriber implements 
DispatchEventSubscriber {
     
     private final ContextManager contextManager;
     
+    private final ComputeNodeInstanceContext computeNodeInstanceContext;
+    
+    public ComputeNodeStateSubscriber(final ContextManager contextManager) {
+        this.contextManager = contextManager;
+        computeNodeInstanceContext = 
contextManager.getComputeNodeInstanceContext();
+    }
+    
     /**
      * Renew instance list.
      *
@@ -43,8 +49,7 @@ public final class ComputeNodeStateSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final InstanceOnlineEvent event) {
-        contextManager.getComputeNodeInstanceContext().addComputeNodeInstance(
-                
contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
+        
computeNodeInstanceContext.addComputeNodeInstance(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
     }
     
     /**
@@ -54,7 +59,7 @@ public final class ComputeNodeStateSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final InstanceOfflineEvent event) {
-        
contextManager.getComputeNodeInstanceContext().deleteComputeNodeInstance(new 
ComputeNodeInstance(event.getInstanceMetaData()));
+        computeNodeInstanceContext.deleteComputeNodeInstance(new 
ComputeNodeInstance(event.getInstanceMetaData()));
     }
     
     /**
@@ -64,7 +69,7 @@ public final class ComputeNodeStateSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final ComputeNodeInstanceStateChangedEvent 
event) {
-        
contextManager.getComputeNodeInstanceContext().updateStatus(event.getInstanceId(),
 event.getStatus());
+        computeNodeInstanceContext.updateStatus(event.getInstanceId(), 
event.getStatus());
     }
     
     /**
@@ -74,7 +79,7 @@ public final class ComputeNodeStateSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final WorkerIdEvent event) {
-        
contextManager.getComputeNodeInstanceContext().updateWorkerId(event.getInstanceId(),
 event.getWorkerId());
+        computeNodeInstanceContext.updateWorkerId(event.getInstanceId(), 
event.getWorkerId());
     }
     
     /**
@@ -85,6 +90,6 @@ public final class ComputeNodeStateSubscriber implements 
EventSubscriber {
     @Subscribe
     public synchronized void renew(final LabelsEvent event) {
         // TODO labels may be empty
-        
contextManager.getComputeNodeInstanceContext().updateLabels(event.getInstanceId(),
 event.getLabels());
+        computeNodeInstanceContext.updateLabels(event.getInstanceId(), 
event.getLabels());
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java
index c092df8e898..4264ad3dcc5 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java
@@ -18,9 +18,6 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
 
 import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataAddedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataDeletedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.data.SchemaDataAddedEvent;
@@ -28,14 +25,20 @@ import 
org.apache.shardingsphere.mode.event.dispatch.metadata.data.SchemaDataDel
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataChangedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataDeletedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.data.TableDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
+import 
org.apache.shardingsphere.mode.metadata.manager.ShardingSphereDatabaseDataManager;
 
 /**
  * Database data changed subscriber.
  */
-@RequiredArgsConstructor
-public final class DatabaseDataChangedSubscriber implements EventSubscriber {
+public final class DatabaseDataChangedSubscriber implements 
DispatchEventSubscriber {
     
-    private final ContextManager contextManager;
+    private final ShardingSphereDatabaseDataManager databaseManager;
+    
+    public DatabaseDataChangedSubscriber(final ContextManager contextManager) {
+        databaseManager = 
contextManager.getMetaDataContextManager().getDatabaseManager();
+    }
     
     /**
      * Renew to persist ShardingSphere database data.
@@ -44,7 +47,7 @@ public final class DatabaseDataChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final DatabaseDataAddedEvent event) {
-        
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereDatabaseData(event.getDatabaseName());
+        databaseManager.addShardingSphereDatabaseData(event.getDatabaseName());
     }
     
     /**
@@ -54,7 +57,7 @@ public final class DatabaseDataChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final DatabaseDataDeletedEvent event) {
-        
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereDatabaseData(event.getDatabaseName());
+        
databaseManager.dropShardingSphereDatabaseData(event.getDatabaseName());
     }
     
     /**
@@ -64,7 +67,7 @@ public final class DatabaseDataChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final SchemaDataAddedEvent event) {
-        
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereSchemaData(event.getDatabaseName(),
 event.getSchemaName());
+        databaseManager.addShardingSphereSchemaData(event.getDatabaseName(), 
event.getSchemaName());
     }
     
     /**
@@ -74,7 +77,7 @@ public final class DatabaseDataChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final SchemaDataDeletedEvent event) {
-        
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereSchemaData(event.getDatabaseName(),
 event.getSchemaName());
+        databaseManager.dropShardingSphereSchemaData(event.getDatabaseName(), 
event.getSchemaName());
     }
     
     /**
@@ -85,10 +88,10 @@ public final class DatabaseDataChangedSubscriber implements 
EventSubscriber {
     @Subscribe
     public synchronized void renew(final TableDataChangedEvent event) {
         if (null != event.getAddedTable()) {
-            
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereTableData(event.getDatabaseName(),
 event.getSchemaName(), event.getAddedTable());
+            
databaseManager.addShardingSphereTableData(event.getDatabaseName(), 
event.getSchemaName(), event.getAddedTable());
         }
         if (null != event.getDeletedTable()) {
-            
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereTableData(event.getDatabaseName(),
 event.getSchemaName(), event.getDeletedTable());
+            
databaseManager.dropShardingSphereTableData(event.getDatabaseName(), 
event.getSchemaName(), event.getDeletedTable());
         }
     }
     
@@ -99,7 +102,7 @@ public final class DatabaseDataChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final ShardingSphereRowDataChangedEvent 
event) {
-        
contextManager.getMetaDataContextManager().getDatabaseManager().alterShardingSphereRowData(event.getDatabaseName(),
 event.getSchemaName(), event.getTableName(), event.getYamlRowData());
+        databaseManager.alterShardingSphereRowData(event.getDatabaseName(), 
event.getSchemaName(), event.getTableName(), event.getYamlRowData());
     }
     
     /**
@@ -109,6 +112,6 @@ public final class DatabaseDataChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public synchronized void renew(final ShardingSphereRowDataDeletedEvent 
event) {
-        
contextManager.getMetaDataContextManager().getDatabaseManager().deleteShardingSphereRowData(event.getDatabaseName(),
 event.getSchemaName(), event.getTableName(), event.getUniqueKey());
+        databaseManager.deleteShardingSphereRowData(event.getDatabaseName(), 
event.getSchemaName(), event.getTableName(), event.getUniqueKey());
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java
index eeff01817b0..d9ff986b01e 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java
@@ -22,9 +22,9 @@ import com.google.common.eventbus.Subscribe;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import 
org.apache.shardingsphere.mode.event.dispatch.config.AlterGlobalRuleConfigurationEvent;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
 import org.apache.shardingsphere.mode.spi.RuleConfigurationPersistDecorator;
 
 import java.util.Optional;
@@ -33,7 +33,7 @@ import java.util.Optional;
  * Global rule configuration event subscriber.
  */
 @RequiredArgsConstructor
-public final class GlobalRuleConfigurationEventSubscriber implements 
EventSubscriber {
+public final class GlobalRuleConfigurationEventSubscriber implements 
DispatchEventSubscriber {
     
     private final ContextManager contextManager;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java
index cb09c7d4378..c2d5b80768f 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java
@@ -20,13 +20,13 @@ package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber
 import com.google.common.eventbus.Subscribe;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
 import 
org.apache.shardingsphere.mode.event.dispatch.assisted.CreateDatabaseListenerAssistedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.assisted.DropDatabaseListenerAssistedEvent;
 import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.DatabaseMetaDataChangedListener;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -35,7 +35,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
  * Listener assisted subscriber.
  */
 @RequiredArgsConstructor
-public final class ListenerAssistedSubscriber implements EventSubscriber {
+public final class ListenerAssistedSubscriber implements 
DispatchEventSubscriber {
     
     private final ContextManager contextManager;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java
index d096dbd7325..7ed94f407b9 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java
@@ -22,7 +22,6 @@ import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent;
@@ -31,6 +30,7 @@ import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewCr
 import 
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent;
 import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -38,7 +38,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 /**
  * Meta data changed subscriber.
  */
-public final class MetaDataChangedSubscriber implements EventSubscriber {
+public final class MetaDataChangedSubscriber implements 
DispatchEventSubscriber {
     
     private final ContextManager contextManager;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
index ba8bd0a3cf3..b64dec3645d 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
@@ -18,20 +18,20 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
 
 import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.executor.sql.process.Process;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
 import 
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
 import 
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessCompletedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesCompletedEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
 
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -40,12 +40,19 @@ import java.util.Collection;
 /**
  * Process list changed subscriber.
  */
-@RequiredArgsConstructor
-public final class ProcessListChangedSubscriber implements EventSubscriber {
+public final class ProcessListChangedSubscriber implements 
DispatchEventSubscriber {
     
     private final ContextManager contextManager;
     
-    private final YamlProcessListSwapper swapper = new 
YamlProcessListSwapper();
+    private final PersistRepository repository;
+    
+    private final YamlProcessListSwapper swapper;
+    
+    public ProcessListChangedSubscriber(final ContextManager contextManager) {
+        this.contextManager = contextManager;
+        repository = contextManager.getPersistServiceFacade().getRepository();
+        swapper = new YamlProcessListSwapper();
+    }
     
     /**
      * Report local processes.
@@ -59,10 +66,9 @@ public final class ProcessListChangedSubscriber implements 
EventSubscriber {
         }
         Collection<Process> processes = 
ProcessRegistry.getInstance().listAll();
         if (!processes.isEmpty()) {
-            contextManager.getPersistServiceFacade().getRepository().persist(
-                    ProcessNode.getProcessListInstancePath(event.getTaskId(), 
event.getInstanceId()), 
YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
+            
repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(), 
event.getInstanceId()), 
YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
         }
-        
contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
 event.getTaskId()));
+        
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
 event.getTaskId()));
     }
     
     /**
@@ -93,7 +99,7 @@ public final class ProcessListChangedSubscriber implements 
EventSubscriber {
                 each.cancel();
             }
         }
-        
contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
 event.getProcessId()));
+        
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
 event.getProcessId()));
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java
index 9f20377a7d0..0665147e703 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java
@@ -20,15 +20,15 @@ package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber
 import com.google.common.base.Preconditions;
 import com.google.common.eventbus.Subscribe;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import 
org.apache.shardingsphere.mode.event.dispatch.config.AlterPropertiesEvent;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
 
 /**
  * Properties event subscriber.
  */
 @RequiredArgsConstructor
-public final class PropertiesEventSubscriber implements EventSubscriber {
+public final class PropertiesEventSubscriber implements 
DispatchEventSubscriber {
     
     private final ContextManager contextManager;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java
index a354fa0a0ca..2de68b4e98b 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java
@@ -23,15 +23,15 @@ import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
 import 
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.storage.QualifiedDataSourceStateEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
 
 /**
  * Qualified data source subscriber.
  */
 @RequiredArgsConstructor
-public class QualifiedDataSourceSubscriber implements EventSubscriber {
+public class QualifiedDataSourceSubscriber implements DispatchEventSubscriber {
     
     private final ContextManager contextManager;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java
index 3d84d50e1a0..de4bcd1c403 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java
@@ -21,8 +21,9 @@ import com.google.common.eventbus.Subscribe;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.mode.event.dispatch.rule.alter.AlterRuleItemEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.rule.drop.DropRuleItemEvent;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
+import org.apache.shardingsphere.mode.metadata.manager.RuleItemManager;
 
 import java.sql.SQLException;
 
@@ -30,9 +31,13 @@ import java.sql.SQLException;
  * Rule item changed subscriber.
  */
 @RequiredArgsConstructor
-public final class RuleItemChangedSubscriber implements EventSubscriber {
+public final class RuleItemChangedSubscriber implements 
DispatchEventSubscriber {
     
-    private final ContextManager contextManager;
+    private final RuleItemManager ruleItemManager;
+    
+    public RuleItemChangedSubscriber(final ContextManager contextManager) {
+        ruleItemManager = 
contextManager.getMetaDataContextManager().getRuleItemManager();
+    }
     
     /**
      * Renew with alter rule item.
@@ -42,7 +47,7 @@ public final class RuleItemChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public void renew(final AlterRuleItemEvent event) throws SQLException {
-        
contextManager.getMetaDataContextManager().getRuleItemManager().alterRuleItem(event);
+        ruleItemManager.alterRuleItem(event);
     }
     
     /**
@@ -53,6 +58,6 @@ public final class RuleItemChangedSubscriber implements 
EventSubscriber {
      */
     @Subscribe
     public void renew(final DropRuleItemEvent event) throws SQLException {
-        
contextManager.getMetaDataContextManager().getRuleItemManager().dropRuleItem(event);
+        ruleItemManager.dropRuleItem(event);
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java
index 05f0d9d2cb5..809b3bc4000 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java
@@ -18,21 +18,19 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
 
 import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.mode.event.dispatch.state.cluster.ClusterStateEvent;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
 
 /**
  * State changed subscriber.
  */
-public final class StateChangedSubscriber implements EventSubscriber {
+@RequiredArgsConstructor
+public final class StateChangedSubscriber implements DispatchEventSubscriber {
     
     private final ContextManager contextManager;
     
-    public StateChangedSubscriber(final ContextManager contextManager) {
-        this.contextManager = contextManager;
-    }
-    
     /**
      * Renew cluster state.
      *
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java
index ea167f1e094..cf8627d6d9d 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java
@@ -21,11 +21,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.eventbus.Subscribe;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import 
org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitAlteredEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitRegisteredEvent;
 import 
org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitUnregisteredEvent;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
 
 import java.util.Collections;
 
@@ -33,7 +33,7 @@ import java.util.Collections;
  * Storage unit event subscriber.
  */
 @RequiredArgsConstructor
-public final class StorageUnitEventSubscriber implements EventSubscriber {
+public final class StorageUnitEventSubscriber implements 
DispatchEventSubscriber {
     
     private final ContextManager contextManager;
     


Reply via email to