This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 257fcd9e54d Add DispatchEventSubscriber (#34069)
257fcd9e54d is described below
commit 257fcd9e54d81d2506e518723771847ded81c059
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 15 23:53:13 2024 +0800
Add DispatchEventSubscriber (#34069)
---
...ubscriber.java => DispatchEventSubscriber.java} | 19 +++----------
.../subscriber/type/CacheEvictedSubscriber.java | 6 ++---
.../type/ComputeNodeStateSubscriber.java | 27 +++++++++++--------
.../type/DatabaseDataChangedSubscriber.java | 31 ++++++++++++----------
.../GlobalRuleConfigurationEventSubscriber.java | 4 +--
.../type/ListenerAssistedSubscriber.java | 4 +--
.../subscriber/type/MetaDataChangedSubscriber.java | 4 +--
.../type/ProcessListChangedSubscriber.java | 26 +++++++++++-------
.../subscriber/type/PropertiesEventSubscriber.java | 4 +--
.../type/QualifiedDataSourceSubscriber.java | 6 ++---
.../subscriber/type/RuleItemChangedSubscriber.java | 15 +++++++----
.../subscriber/type/StateChangedSubscriber.java | 10 +++----
.../type/StorageUnitEventSubscriber.java | 4 +--
13 files changed, 82 insertions(+), 78 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java
similarity index 63%
copy from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
copy to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java
index de796a986f2..2a915c5093d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/DispatchEventSubscriber.java
@@ -15,25 +15,12 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
+package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber;
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
-import
org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
/**
- * Cache evicted subscriber.
+ * Dispatch event subscriber.
*/
-public final class CacheEvictedSubscriber implements EventSubscriber {
-
- /**
- * Callback of any {@link DispatchEvent}.
- *
- * @param ignored unused
- */
- @Subscribe
- public void cleanCache(final DispatchEvent ignored) {
- OrderedServicesCache.clearCache();
- }
+public interface DispatchEventSubscriber extends EventSubscriber {
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
index de796a986f2..ef02905d65d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/CacheEvictedSubscriber.java
@@ -18,14 +18,14 @@
package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
import
org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import org.apache.shardingsphere.mode.event.dispatch.DispatchEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
/**
* Cache evicted subscriber.
*/
-public final class CacheEvictedSubscriber implements EventSubscriber {
+public final class CacheEvictedSubscriber implements DispatchEventSubscriber {
/**
* Callback of any {@link DispatchEvent}.
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
index 6eb8b274a64..cf282f34c58 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ComputeNodeStateSubscriber.java
@@ -18,24 +18,30 @@
package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.ComputeNodeInstanceStateChangedEvent;
import org.apache.shardingsphere.mode.event.dispatch.state.compute.LabelsEvent;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.WorkerIdEvent;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOfflineEvent;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.instance.InstanceOnlineEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
/**
* Compute node state subscriber.
*/
-@RequiredArgsConstructor
-public final class ComputeNodeStateSubscriber implements EventSubscriber {
+public final class ComputeNodeStateSubscriber implements
DispatchEventSubscriber {
private final ContextManager contextManager;
+ private final ComputeNodeInstanceContext computeNodeInstanceContext;
+
+ public ComputeNodeStateSubscriber(final ContextManager contextManager) {
+ this.contextManager = contextManager;
+ computeNodeInstanceContext =
contextManager.getComputeNodeInstanceContext();
+ }
+
/**
* Renew instance list.
*
@@ -43,8 +49,7 @@ public final class ComputeNodeStateSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final InstanceOnlineEvent event) {
- contextManager.getComputeNodeInstanceContext().addComputeNodeInstance(
-
contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
+
computeNodeInstanceContext.addComputeNodeInstance(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceMetaData()));
}
/**
@@ -54,7 +59,7 @@ public final class ComputeNodeStateSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final InstanceOfflineEvent event) {
-
contextManager.getComputeNodeInstanceContext().deleteComputeNodeInstance(new
ComputeNodeInstance(event.getInstanceMetaData()));
+ computeNodeInstanceContext.deleteComputeNodeInstance(new
ComputeNodeInstance(event.getInstanceMetaData()));
}
/**
@@ -64,7 +69,7 @@ public final class ComputeNodeStateSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final ComputeNodeInstanceStateChangedEvent
event) {
-
contextManager.getComputeNodeInstanceContext().updateStatus(event.getInstanceId(),
event.getStatus());
+ computeNodeInstanceContext.updateStatus(event.getInstanceId(),
event.getStatus());
}
/**
@@ -74,7 +79,7 @@ public final class ComputeNodeStateSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final WorkerIdEvent event) {
-
contextManager.getComputeNodeInstanceContext().updateWorkerId(event.getInstanceId(),
event.getWorkerId());
+ computeNodeInstanceContext.updateWorkerId(event.getInstanceId(),
event.getWorkerId());
}
/**
@@ -85,6 +90,6 @@ public final class ComputeNodeStateSubscriber implements
EventSubscriber {
@Subscribe
public synchronized void renew(final LabelsEvent event) {
// TODO labels may be empty
-
contextManager.getComputeNodeInstanceContext().updateLabels(event.getInstanceId(),
event.getLabels());
+ computeNodeInstanceContext.updateLabels(event.getInstanceId(),
event.getLabels());
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java
index c092df8e898..4264ad3dcc5 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/DatabaseDataChangedSubscriber.java
@@ -18,9 +18,6 @@
package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataAddedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.data.DatabaseDataDeletedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.data.SchemaDataAddedEvent;
@@ -28,14 +25,20 @@ import
org.apache.shardingsphere.mode.event.dispatch.metadata.data.SchemaDataDel
import
org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataChangedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.data.ShardingSphereRowDataDeletedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.data.TableDataChangedEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
+import
org.apache.shardingsphere.mode.metadata.manager.ShardingSphereDatabaseDataManager;
/**
* Database data changed subscriber.
*/
-@RequiredArgsConstructor
-public final class DatabaseDataChangedSubscriber implements EventSubscriber {
+public final class DatabaseDataChangedSubscriber implements
DispatchEventSubscriber {
- private final ContextManager contextManager;
+ private final ShardingSphereDatabaseDataManager databaseManager;
+
+ public DatabaseDataChangedSubscriber(final ContextManager contextManager) {
+ databaseManager =
contextManager.getMetaDataContextManager().getDatabaseManager();
+ }
/**
* Renew to persist ShardingSphere database data.
@@ -44,7 +47,7 @@ public final class DatabaseDataChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final DatabaseDataAddedEvent event) {
-
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereDatabaseData(event.getDatabaseName());
+ databaseManager.addShardingSphereDatabaseData(event.getDatabaseName());
}
/**
@@ -54,7 +57,7 @@ public final class DatabaseDataChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final DatabaseDataDeletedEvent event) {
-
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereDatabaseData(event.getDatabaseName());
+
databaseManager.dropShardingSphereDatabaseData(event.getDatabaseName());
}
/**
@@ -64,7 +67,7 @@ public final class DatabaseDataChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final SchemaDataAddedEvent event) {
-
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereSchemaData(event.getDatabaseName(),
event.getSchemaName());
+ databaseManager.addShardingSphereSchemaData(event.getDatabaseName(),
event.getSchemaName());
}
/**
@@ -74,7 +77,7 @@ public final class DatabaseDataChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final SchemaDataDeletedEvent event) {
-
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereSchemaData(event.getDatabaseName(),
event.getSchemaName());
+ databaseManager.dropShardingSphereSchemaData(event.getDatabaseName(),
event.getSchemaName());
}
/**
@@ -85,10 +88,10 @@ public final class DatabaseDataChangedSubscriber implements
EventSubscriber {
@Subscribe
public synchronized void renew(final TableDataChangedEvent event) {
if (null != event.getAddedTable()) {
-
contextManager.getMetaDataContextManager().getDatabaseManager().addShardingSphereTableData(event.getDatabaseName(),
event.getSchemaName(), event.getAddedTable());
+
databaseManager.addShardingSphereTableData(event.getDatabaseName(),
event.getSchemaName(), event.getAddedTable());
}
if (null != event.getDeletedTable()) {
-
contextManager.getMetaDataContextManager().getDatabaseManager().dropShardingSphereTableData(event.getDatabaseName(),
event.getSchemaName(), event.getDeletedTable());
+
databaseManager.dropShardingSphereTableData(event.getDatabaseName(),
event.getSchemaName(), event.getDeletedTable());
}
}
@@ -99,7 +102,7 @@ public final class DatabaseDataChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final ShardingSphereRowDataChangedEvent
event) {
-
contextManager.getMetaDataContextManager().getDatabaseManager().alterShardingSphereRowData(event.getDatabaseName(),
event.getSchemaName(), event.getTableName(), event.getYamlRowData());
+ databaseManager.alterShardingSphereRowData(event.getDatabaseName(),
event.getSchemaName(), event.getTableName(), event.getYamlRowData());
}
/**
@@ -109,6 +112,6 @@ public final class DatabaseDataChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final ShardingSphereRowDataDeletedEvent
event) {
-
contextManager.getMetaDataContextManager().getDatabaseManager().deleteShardingSphereRowData(event.getDatabaseName(),
event.getSchemaName(), event.getTableName(), event.getUniqueKey());
+ databaseManager.deleteShardingSphereRowData(event.getDatabaseName(),
event.getSchemaName(), event.getTableName(), event.getUniqueKey());
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java
index eeff01817b0..d9ff986b01e 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/GlobalRuleConfigurationEventSubscriber.java
@@ -22,9 +22,9 @@ import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import
org.apache.shardingsphere.mode.event.dispatch.config.AlterGlobalRuleConfigurationEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import org.apache.shardingsphere.mode.spi.RuleConfigurationPersistDecorator;
import java.util.Optional;
@@ -33,7 +33,7 @@ import java.util.Optional;
* Global rule configuration event subscriber.
*/
@RequiredArgsConstructor
-public final class GlobalRuleConfigurationEventSubscriber implements
EventSubscriber {
+public final class GlobalRuleConfigurationEventSubscriber implements
DispatchEventSubscriber {
private final ContextManager contextManager;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java
index cb09c7d4378..c2d5b80768f 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ListenerAssistedSubscriber.java
@@ -20,13 +20,13 @@ package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
import
org.apache.shardingsphere.mode.event.dispatch.assisted.CreateDatabaseListenerAssistedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.assisted.DropDatabaseListenerAssistedEvent;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.listener.type.DatabaseMetaDataChangedListener;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -35,7 +35,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
* Listener assisted subscriber.
*/
@RequiredArgsConstructor
-public final class ListenerAssistedSubscriber implements EventSubscriber {
+public final class ListenerAssistedSubscriber implements
DispatchEventSubscriber {
private final ContextManager contextManager;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java
index d096dbd7325..7ed94f407b9 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/MetaDataChangedSubscriber.java
@@ -22,7 +22,6 @@ import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaAddedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.SchemaDeletedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.table.TableCreatedOrAlteredEvent;
@@ -31,6 +30,7 @@ import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewCr
import
org.apache.shardingsphere.mode.event.dispatch.metadata.schema.view.ViewDroppedEvent;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.persist.service.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.metadata.refresher.ShardingSphereStatisticsRefreshEngine;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -38,7 +38,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
/**
* Meta data changed subscriber.
*/
-public final class MetaDataChangedSubscriber implements EventSubscriber {
+public final class MetaDataChangedSubscriber implements
DispatchEventSubscriber {
private final ContextManager contextManager;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
index ba8bd0a3cf3..b64dec3645d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/ProcessListChangedSubscriber.java
@@ -18,20 +18,20 @@
package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import
org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessCompletedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.KillLocalProcessEvent;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesCompletedEvent;
import
org.apache.shardingsphere.mode.event.dispatch.state.compute.ReportLocalProcessesEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
import java.sql.SQLException;
import java.sql.Statement;
@@ -40,12 +40,19 @@ import java.util.Collection;
/**
* Process list changed subscriber.
*/
-@RequiredArgsConstructor
-public final class ProcessListChangedSubscriber implements EventSubscriber {
+public final class ProcessListChangedSubscriber implements
DispatchEventSubscriber {
private final ContextManager contextManager;
- private final YamlProcessListSwapper swapper = new
YamlProcessListSwapper();
+ private final PersistRepository repository;
+
+ private final YamlProcessListSwapper swapper;
+
+ public ProcessListChangedSubscriber(final ContextManager contextManager) {
+ this.contextManager = contextManager;
+ repository = contextManager.getPersistServiceFacade().getRepository();
+ swapper = new YamlProcessListSwapper();
+ }
/**
* Report local processes.
@@ -59,10 +66,9 @@ public final class ProcessListChangedSubscriber implements
EventSubscriber {
}
Collection<Process> processes =
ProcessRegistry.getInstance().listAll();
if (!processes.isEmpty()) {
- contextManager.getPersistServiceFacade().getRepository().persist(
- ProcessNode.getProcessListInstancePath(event.getTaskId(),
event.getInstanceId()),
YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
+
repository.persist(ProcessNode.getProcessListInstancePath(event.getTaskId(),
event.getInstanceId()),
YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
}
-
contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
event.getTaskId()));
+
repository.delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(),
event.getTaskId()));
}
/**
@@ -93,7 +99,7 @@ public final class ProcessListChangedSubscriber implements
EventSubscriber {
each.cancel();
}
}
-
contextManager.getPersistServiceFacade().getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
event.getProcessId()));
+
repository.delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
event.getProcessId()));
}
/**
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java
index 9f20377a7d0..0665147e703 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/PropertiesEventSubscriber.java
@@ -20,15 +20,15 @@ package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber
import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import
org.apache.shardingsphere.mode.event.dispatch.config.AlterPropertiesEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
/**
* Properties event subscriber.
*/
@RequiredArgsConstructor
-public final class PropertiesEventSubscriber implements EventSubscriber {
+public final class PropertiesEventSubscriber implements
DispatchEventSubscriber {
private final ContextManager contextManager;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java
index a354fa0a0ca..2de68b4e98b 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/QualifiedDataSourceSubscriber.java
@@ -23,15 +23,15 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.event.dispatch.state.storage.QualifiedDataSourceStateEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
/**
* Qualified data source subscriber.
*/
@RequiredArgsConstructor
-public class QualifiedDataSourceSubscriber implements EventSubscriber {
+public class QualifiedDataSourceSubscriber implements DispatchEventSubscriber {
private final ContextManager contextManager;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java
index 3d84d50e1a0..de4bcd1c403 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/RuleItemChangedSubscriber.java
@@ -21,8 +21,9 @@ import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.mode.event.dispatch.rule.alter.AlterRuleItemEvent;
import
org.apache.shardingsphere.mode.event.dispatch.rule.drop.DropRuleItemEvent;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
+import org.apache.shardingsphere.mode.metadata.manager.RuleItemManager;
import java.sql.SQLException;
@@ -30,9 +31,13 @@ import java.sql.SQLException;
* Rule item changed subscriber.
*/
@RequiredArgsConstructor
-public final class RuleItemChangedSubscriber implements EventSubscriber {
+public final class RuleItemChangedSubscriber implements
DispatchEventSubscriber {
- private final ContextManager contextManager;
+ private final RuleItemManager ruleItemManager;
+
+ public RuleItemChangedSubscriber(final ContextManager contextManager) {
+ ruleItemManager =
contextManager.getMetaDataContextManager().getRuleItemManager();
+ }
/**
* Renew with alter rule item.
@@ -42,7 +47,7 @@ public final class RuleItemChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public void renew(final AlterRuleItemEvent event) throws SQLException {
-
contextManager.getMetaDataContextManager().getRuleItemManager().alterRuleItem(event);
+ ruleItemManager.alterRuleItem(event);
}
/**
@@ -53,6 +58,6 @@ public final class RuleItemChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public void renew(final DropRuleItemEvent event) throws SQLException {
-
contextManager.getMetaDataContextManager().getRuleItemManager().dropRuleItem(event);
+ ruleItemManager.dropRuleItem(event);
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java
index 05f0d9d2cb5..809b3bc4000 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StateChangedSubscriber.java
@@ -18,21 +18,19 @@
package
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.type;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.mode.event.dispatch.state.cluster.ClusterStateEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
/**
* State changed subscriber.
*/
-public final class StateChangedSubscriber implements EventSubscriber {
+@RequiredArgsConstructor
+public final class StateChangedSubscriber implements DispatchEventSubscriber {
private final ContextManager contextManager;
- public StateChangedSubscriber(final ContextManager contextManager) {
- this.contextManager = contextManager;
- }
-
/**
* Renew cluster state.
*
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java
index ea167f1e094..cf8627d6d9d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/dispatch/subscriber/type/StorageUnitEventSubscriber.java
@@ -21,11 +21,11 @@ import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import
org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitAlteredEvent;
import
org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitRegisteredEvent;
import
org.apache.shardingsphere.mode.event.dispatch.datasource.unit.StorageUnitUnregisteredEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.event.dispatch.subscriber.DispatchEventSubscriber;
import java.util.Collections;
@@ -33,7 +33,7 @@ import java.util.Collections;
* Storage unit event subscriber.
*/
@RequiredArgsConstructor
-public final class StorageUnitEventSubscriber implements EventSubscriber {
+public final class StorageUnitEventSubscriber implements
DispatchEventSubscriber {
private final ContextManager contextManager;