This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4de2c40ff8b Refactor QualifiedDataSourceWatcher and Subscriber (#31625)
4de2c40ff8b is described below

commit 4de2c40ff8b028f22fc3eee7641d4f440868a676
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Jun 7 17:33:48 2024 +0800

    Refactor QualifiedDataSourceWatcher and Subscriber (#31625)
    
    * Refactor QualifiedDataSourceWatcher and Subscriber
    
    * Refactor QualifiedDataSourceWatcher and Subscriber
---
 .../mode/storage/QualifiedDataSourceState.java     |  2 +-
 .../yaml/YamlQualifiedDataSourceStatusSwapper.java |  2 +-
 .../cluster/ClusterContextManagerBuilder.java      |  2 +-
 .../event/QualifiedDataSourceStateEvent.java}      |  4 +-
 .../subscriber/QualifiedDataSourceSubscriber.java  | 53 ++++++++++++++++
 .../QualifiedDataSourceStatusSubscriber.java       |  2 +-
 .../watcher/QualifiedDataSourceWatcher.java}       | 12 ++--
 .../subscriber/ClusterEventSubscriberRegistry.java |  4 +-
 .../subscriber/StateChangedSubscriber.java         | 21 -------
 ....cluster.coordinator.registry.GovernanceWatcher |  2 +-
 .../QualifiedDataSourceSubscriberTest.java}        | 73 +++++-----------------
 .../QualifiedDataSourceStateSubscriberTest.java    |  1 +
 ...st.java => QualifiedDataSourceWatcherTest.java} | 19 +++---
 .../subscriber/StateChangedSubscriberTest.java     | 19 ------
 14 files changed, 97 insertions(+), 119 deletions(-)

diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/QualifiedDataSourceState.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/QualifiedDataSourceState.java
index c7401ce5159..48cf886986e 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/QualifiedDataSourceState.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/QualifiedDataSourceState.java
@@ -28,5 +28,5 @@ import 
org.apache.shardingsphere.infra.state.datasource.DataSourceState;
 @Getter
 public final class QualifiedDataSourceState {
     
-    private final DataSourceState status;
+    private final DataSourceState state;
 }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/yaml/YamlQualifiedDataSourceStatusSwapper.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/yaml/YamlQualifiedDataSourceStatusSwapper.java
index f75545cdff8..7a99ad42671 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/yaml/YamlQualifiedDataSourceStatusSwapper.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/yaml/YamlQualifiedDataSourceStatusSwapper.java
@@ -29,7 +29,7 @@ public final class YamlQualifiedDataSourceStatusSwapper 
implements YamlConfigura
     @Override
     public YamlQualifiedDataSourceStatus swapToYamlConfiguration(final 
QualifiedDataSourceState data) {
         YamlQualifiedDataSourceStatus result = new 
YamlQualifiedDataSourceStatus();
-        result.setStatus(data.getStatus().name());
+        result.setStatus(data.getState().name());
         return result;
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index dc790d7f747..67ffdc0701b 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -35,7 +35,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaD
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.QualifiedDataSourceStatusSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
 import 
org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/event/StorageNodeChangedEvent.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/event/QualifiedDataSourceStateEvent.java
similarity index 92%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/event/StorageNodeChangedEvent.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/event/QualifiedDataSourceStateEvent.java
index 73f8bc70ffc..4a2ee61f70f 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/event/StorageNodeChangedEvent.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/event/QualifiedDataSourceStateEvent.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event;
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -28,7 +28,7 @@ import 
org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
  */
 @RequiredArgsConstructor
 @Getter
-public final class StorageNodeChangedEvent implements GovernanceEvent {
+public final class QualifiedDataSourceStateEvent implements GovernanceEvent {
     
     private final QualifiedDataSource qualifiedDataSource;
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriber.java
new file mode 100644
index 00000000000..0444ff27f5c
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriber.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
+import 
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
+import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event.QualifiedDataSourceStateEvent;
+
+/**
+ * Qualified data source subscriber.
+ */
+@RequiredArgsConstructor
+public class QualifiedDataSourceSubscriber implements EventSubscriber {
+    
+    private final ContextManager contextManager;
+    
+    /**
+     * Renew disabled data source names.
+     *
+     * @param event qualified data source state event
+     */
+    @Subscribe
+    public synchronized void renew(final QualifiedDataSourceStateEvent event) {
+        QualifiedDataSource qualifiedDataSource = 
event.getQualifiedDataSource();
+        if 
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(qualifiedDataSource.getDatabaseName()))
 {
+            return;
+        }
+        ShardingSphereDatabase database = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDataSource.getDatabaseName());
+        for (StaticDataSourceRuleAttribute each : 
database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)) {
+            each.updateStatus(qualifiedDataSource, 
event.getStatus().getState());
+        }
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
similarity index 97%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
index 1dd359778a6..21ff04409c5 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal;
 
 import com.google.common.eventbus.Subscribe;
 import lombok.RequiredArgsConstructor;
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/watcher/QualifiedDataSourceWatcher.java
similarity index 83%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/watcher/QualifiedDataSourceWatcher.java
index aeb4e05fd54..9097c81cb0c 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/watcher/QualifiedDataSourceWatcher.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher;
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.watcher;
 
 import com.google.common.base.Strings;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event.QualifiedDataSourceStateEvent;
 import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
 import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
 import 
org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatus;
@@ -36,9 +36,9 @@ import java.util.Collections;
 import java.util.Optional;
 
 /**
- * Qualified data source changed watcher.
+ * Qualified data source watcher.
  */
-public final class QualifiedDataSourceChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
+public final class QualifiedDataSourceWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
     public Collection<String> getWatchingKeys() {
@@ -57,8 +57,8 @@ public final class QualifiedDataSourceChangedWatcher 
implements GovernanceWatche
         }
         Optional<QualifiedDataSource> qualifiedDataSource = 
QualifiedDataSourceNode.extractQualifiedDataSource(event.getKey());
         if (qualifiedDataSource.isPresent()) {
-            QualifiedDataSourceState status = new 
YamlQualifiedDataSourceStatusSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(),
 YamlQualifiedDataSourceStatus.class));
-            return Optional.of(new 
StorageNodeChangedEvent(qualifiedDataSource.get(), status));
+            QualifiedDataSourceState state = new 
YamlQualifiedDataSourceStatusSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(),
 YamlQualifiedDataSourceStatus.class));
+            return Optional.of(new 
QualifiedDataSourceStateEvent(qualifiedDataSource.get(), state));
         }
         return Optional.empty();
     }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
index e295c2fda38..11b360f857d 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.compute.online.subscriber.ComputeNodeOnlineSubscriber;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.QualifiedDataSourceSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessListChangedSubscriber;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.subsciber.EventSubscriberRegistry;
@@ -40,6 +41,7 @@ public final class ClusterEventSubscriberRegistry extends 
EventSubscriberRegistr
                 new DatabaseChangedSubscriber(contextManager),
                 new ProcessListChangedSubscriber(contextManager, repository),
                 new CacheEvictedSubscriber(),
-                new ComputeNodeOnlineSubscriber(contextManager));
+                new ComputeNodeOnlineSubscriber(contextManager),
+                new QualifiedDataSourceSubscriber(contextManager));
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index f7eae73b507..226cbca0e90 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -18,16 +18,12 @@
 package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 
 import com.google.common.eventbus.Subscribe;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
-import 
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
 import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeInstanceStateChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
 
 /**
  * State changed subscriber.
@@ -41,23 +37,6 @@ public final class StateChangedSubscriber implements 
EventSubscriber {
         this.contextManager = contextManager;
     }
     
-    /**
-     * Renew disabled data source names.
-     * 
-     * @param event Storage node changed event
-     */
-    @Subscribe
-    public synchronized void renew(final StorageNodeChangedEvent event) {
-        QualifiedDataSource qualifiedDataSource = 
event.getQualifiedDataSource();
-        if 
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(qualifiedDataSource.getDatabaseName()))
 {
-            return;
-        }
-        ShardingSphereDatabase database = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDataSource.getDatabaseName());
-        for (StaticDataSourceRuleAttribute each : 
database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)) {
-            each.updateStatus(qualifiedDataSource, 
event.getStatus().getStatus());
-        }
-    }
-    
     /**
      * Renew cluster state.
      * 
diff --git 
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
 
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
index 30feb1f9011..95bc0d0c8d1 100644
--- 
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++ 
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher.QualifiedDataSourceChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.watcher.QualifiedDataSourceWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher.ClusterStateChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriberTest.java
similarity index 67%
copy from 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
copy to 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriberTest.java
index 46e06cd4ee1..2afaca2611e 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriberTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber;
 
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
@@ -28,18 +28,12 @@ import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSou
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
 import 
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.state.cluster.ClusterState;
 import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
-import org.apache.shardingsphere.infra.state.instance.InstanceState;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeInstanceStateChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event.QualifiedDataSourceStateEvent;
 import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
@@ -53,15 +47,12 @@ import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -69,11 +60,9 @@ import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
-class StateChangedSubscriberTest {
+class QualifiedDataSourceSubscriberTest {
     
-    private StateChangedSubscriber subscriber;
-    
-    private ContextManager contextManager;
+    private QualifiedDataSourceSubscriber subscriber;
     
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ShardingSphereDatabase database;
@@ -81,11 +70,22 @@ class StateChangedSubscriberTest {
     @BeforeEach
     void setUp() throws SQLException {
         EventBusContext eventBusContext = new EventBusContext();
-        contextManager = new 
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter(), 
eventBusContext);
+        ContextManager contextManager = new 
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter(), 
eventBusContext);
         
contextManager.renewMetaDataContexts(MetaDataContextsFactory.create(contextManager.getPersistServiceFacade().getMetaDataPersistService(),
 new ShardingSphereMetaData(createDatabases(),
                 
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
                 new ConfigurationProperties(new Properties()))));
-        subscriber = new StateChangedSubscriber(contextManager);
+        subscriber = new QualifiedDataSourceSubscriber(contextManager);
+    }
+    
+    @Test
+    void assertRenewForDisableStateChanged() {
+        StaticDataSourceRuleAttribute ruleAttribute = 
mock(StaticDataSourceRuleAttribute.class);
+        
when(database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)).thenReturn(Collections.singleton(ruleAttribute));
+        QualifiedDataSourceStateEvent event = new 
QualifiedDataSourceStateEvent(new QualifiedDataSource("db.readwrite_ds.ds_0"), 
new QualifiedDataSourceState(DataSourceState.DISABLED));
+        subscriber.renew(event);
+        verify(ruleAttribute).updateStatus(
+                argThat(qualifiedDataSource -> 
Objects.equals(event.getQualifiedDataSource(), qualifiedDataSource)),
+                argThat(dataSourceState -> event.getStatus().getState() == 
dataSourceState));
     }
     
     private ContextManagerBuilderParameter 
createContextManagerBuilderParameter() {
@@ -103,43 +103,4 @@ class StateChangedSubscriberTest {
         
when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
         return Collections.singletonMap("db", database);
     }
-    
-    @Test
-    void assertRenewForDisableStateChanged() {
-        StaticDataSourceRuleAttribute ruleAttribute = 
mock(StaticDataSourceRuleAttribute.class);
-        
when(database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)).thenReturn(Collections.singleton(ruleAttribute));
-        StorageNodeChangedEvent event = new StorageNodeChangedEvent(new 
QualifiedDataSource("db.readwrite_ds.ds_0"), new 
QualifiedDataSourceState(DataSourceState.DISABLED));
-        subscriber.renew(event);
-        verify(ruleAttribute).updateStatus(
-                argThat(qualifiedDataSource -> 
Objects.equals(event.getQualifiedDataSource(), qualifiedDataSource)),
-                argThat(dataSourceState -> event.getStatus().getStatus() == 
dataSourceState));
-    }
-    
-    @Test
-    void assertRenewClusterState() {
-        ClusterStateEvent mockClusterStateEvent = new 
ClusterStateEvent(ClusterState.READ_ONLY);
-        subscriber.renew(mockClusterStateEvent);
-        assertThat(contextManager.getStateContext().getClusterState(), 
is(ClusterState.READ_ONLY));
-    }
-    
-    @Test
-    void assertRenewInstanceState() {
-        ComputeNodeInstanceStateChangedEvent event = new 
ComputeNodeInstanceStateChangedEvent(
-                
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(),
 InstanceState.OK.name());
-        subscriber.renew(event);
-        
assertThat(contextManager.getComputeNodeInstanceContext().getInstance().getState().getCurrentState(),
 is(InstanceState.OK));
-    }
-    
-    @Test
-    void assertRenewInstanceWorkerIdEvent() {
-        subscriber.renew(new 
WorkerIdEvent(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(),
 0));
-        
assertThat(contextManager.getComputeNodeInstanceContext().getInstance().getWorkerId(),
 is(0));
-    }
-    
-    @Test
-    void assertRenewInstanceLabels() {
-        Collection<String> labels = Collections.singletonList("test");
-        subscriber.renew(new 
LabelsEvent(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(),
 labels));
-        
assertThat(contextManager.getComputeNodeInstanceContext().getInstance().getLabels(),
 is(labels));
-    }
 }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
index 67a11e39287..4a680d40f28 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
 
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
 import 
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.QualifiedDataSourceStatusSubscriber;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
 import org.junit.jupiter.api.Test;
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcherTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceWatcherTest.java
similarity index 81%
rename from 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcherTest.java
rename to 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceWatcherTest.java
index 79f2463444d..3c504f5364c 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcherTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceWatcherTest.java
@@ -21,7 +21,8 @@ import 
org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
 import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
 import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event.QualifiedDataSourceStateEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.watcher.QualifiedDataSourceWatcher;
 import org.junit.jupiter.api.Test;
 
 import java.util.Optional;
@@ -31,35 +32,35 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-class QualifiedDataSourceChangedWatcherTest {
+class QualifiedDataSourceWatcherTest {
     
     @Test
     void assertCreateEnabledQualifiedDataSourceChangedEvent() {
-        Optional<GovernanceEvent> actual = new 
QualifiedDataSourceChangedWatcher().createGovernanceEvent(
+        Optional<GovernanceEvent> actual = new 
QualifiedDataSourceWatcher().createGovernanceEvent(
                 new 
DataChangedEvent("/nodes/qualified_data_sources/replica_query_db.readwrite_ds.replica_ds_0",
 "status: ENABLED\n", Type.ADDED));
         assertTrue(actual.isPresent());
-        StorageNodeChangedEvent actualEvent = (StorageNodeChangedEvent) 
actual.get();
+        QualifiedDataSourceStateEvent actualEvent = 
(QualifiedDataSourceStateEvent) actual.get();
         assertThat(actualEvent.getQualifiedDataSource().getDatabaseName(), 
is("replica_query_db"));
         assertThat(actualEvent.getQualifiedDataSource().getGroupName(), 
is("readwrite_ds"));
         assertThat(actualEvent.getQualifiedDataSource().getDataSourceName(), 
is("replica_ds_0"));
-        assertThat(actualEvent.getStatus().getStatus(), 
is(DataSourceState.ENABLED));
+        assertThat(actualEvent.getStatus().getState(), 
is(DataSourceState.ENABLED));
     }
     
     @Test
     void assertCreateDisabledQualifiedDataSourceChangedEvent() {
-        Optional<GovernanceEvent> actual = new 
QualifiedDataSourceChangedWatcher().createGovernanceEvent(
+        Optional<GovernanceEvent> actual = new 
QualifiedDataSourceWatcher().createGovernanceEvent(
                 new 
DataChangedEvent("/nodes/qualified_data_sources/replica_query_db.readwrite_ds.replica_ds_0",
 "status: DISABLED\n", Type.DELETED));
         assertTrue(actual.isPresent());
-        StorageNodeChangedEvent actualEvent = (StorageNodeChangedEvent) 
actual.get();
+        QualifiedDataSourceStateEvent actualEvent = 
(QualifiedDataSourceStateEvent) actual.get();
         assertThat(actualEvent.getQualifiedDataSource().getDatabaseName(), 
is("replica_query_db"));
         assertThat(actualEvent.getQualifiedDataSource().getGroupName(), 
is("readwrite_ds"));
         assertThat(actualEvent.getQualifiedDataSource().getDataSourceName(), 
is("replica_ds_0"));
-        assertThat(actualEvent.getStatus().getStatus(), 
is(DataSourceState.DISABLED));
+        assertThat(actualEvent.getStatus().getState(), 
is(DataSourceState.DISABLED));
     }
     
     @Test
     void assertCreateEmptyEvent() {
-        Optional<GovernanceEvent> actual = new 
QualifiedDataSourceChangedWatcher().createGovernanceEvent(
+        Optional<GovernanceEvent> actual = new 
QualifiedDataSourceWatcher().createGovernanceEvent(
                 new 
DataChangedEvent("/nodes/qualified_data_sources/replica_query_db.readwrite_ds.replica_ds_0",
 "", Type.ADDED));
         assertFalse(actual.isPresent());
     }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index 46e06cd4ee1..1b10939272d 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -24,12 +24,9 @@ import 
org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import 
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import 
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
 import org.apache.shardingsphere.infra.state.instance.InstanceState;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -39,10 +36,8 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeInstanceStateChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
 import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -57,14 +52,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -104,17 +96,6 @@ class StateChangedSubscriberTest {
         return Collections.singletonMap("db", database);
     }
     
-    @Test
-    void assertRenewForDisableStateChanged() {
-        StaticDataSourceRuleAttribute ruleAttribute = 
mock(StaticDataSourceRuleAttribute.class);
-        
when(database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)).thenReturn(Collections.singleton(ruleAttribute));
-        StorageNodeChangedEvent event = new StorageNodeChangedEvent(new 
QualifiedDataSource("db.readwrite_ds.ds_0"), new 
QualifiedDataSourceState(DataSourceState.DISABLED));
-        subscriber.renew(event);
-        verify(ruleAttribute).updateStatus(
-                argThat(qualifiedDataSource -> 
Objects.equals(event.getQualifiedDataSource(), qualifiedDataSource)),
-                argThat(dataSourceState -> event.getStatus().getStatus() == 
dataSourceState));
-    }
-    
     @Test
     void assertRenewClusterState() {
         ClusterStateEvent mockClusterStateEvent = new 
ClusterStateEvent(ClusterState.READ_ONLY);


Reply via email to