This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4de2c40ff8b Refactor QualifiedDataSourceWatcher and Subscriber (#31625)
4de2c40ff8b is described below
commit 4de2c40ff8b028f22fc3eee7641d4f440868a676
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Jun 7 17:33:48 2024 +0800
Refactor QualifiedDataSourceWatcher and Subscriber (#31625)
* Refactor QualifiedDataSourceWatcher and Subscriber
* Refactor QualifiedDataSourceWatcher and Subscriber
---
.../mode/storage/QualifiedDataSourceState.java | 2 +-
.../yaml/YamlQualifiedDataSourceStatusSwapper.java | 2 +-
.../cluster/ClusterContextManagerBuilder.java | 2 +-
.../event/QualifiedDataSourceStateEvent.java} | 4 +-
.../subscriber/QualifiedDataSourceSubscriber.java | 53 ++++++++++++++++
.../QualifiedDataSourceStatusSubscriber.java | 2 +-
.../watcher/QualifiedDataSourceWatcher.java} | 12 ++--
.../subscriber/ClusterEventSubscriberRegistry.java | 4 +-
.../subscriber/StateChangedSubscriber.java | 21 -------
....cluster.coordinator.registry.GovernanceWatcher | 2 +-
.../QualifiedDataSourceSubscriberTest.java} | 73 +++++-----------------
.../QualifiedDataSourceStateSubscriberTest.java | 1 +
...st.java => QualifiedDataSourceWatcherTest.java} | 19 +++---
.../subscriber/StateChangedSubscriberTest.java | 19 ------
14 files changed, 97 insertions(+), 119 deletions(-)
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/QualifiedDataSourceState.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/QualifiedDataSourceState.java
index c7401ce5159..48cf886986e 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/QualifiedDataSourceState.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/QualifiedDataSourceState.java
@@ -28,5 +28,5 @@ import
org.apache.shardingsphere.infra.state.datasource.DataSourceState;
@Getter
public final class QualifiedDataSourceState {
- private final DataSourceState status;
+ private final DataSourceState state;
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/yaml/YamlQualifiedDataSourceStatusSwapper.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/yaml/YamlQualifiedDataSourceStatusSwapper.java
index f75545cdff8..7a99ad42671 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/yaml/YamlQualifiedDataSourceStatusSwapper.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/storage/yaml/YamlQualifiedDataSourceStatusSwapper.java
@@ -29,7 +29,7 @@ public final class YamlQualifiedDataSourceStatusSwapper
implements YamlConfigura
@Override
public YamlQualifiedDataSourceStatus swapToYamlConfiguration(final
QualifiedDataSourceState data) {
YamlQualifiedDataSourceStatus result = new
YamlQualifiedDataSourceStatus();
- result.setStatus(data.getStatus().name());
+ result.setStatus(data.getState().name());
return result;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index dc790d7f747..67ffdc0701b 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -35,7 +35,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaD
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.MetaDataChangedListener;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.QualifiedDataSourceStatusSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
import
org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/event/StorageNodeChangedEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/event/QualifiedDataSourceStateEvent.java
similarity index 92%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/event/StorageNodeChangedEvent.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/event/QualifiedDataSourceStateEvent.java
index 73f8bc70ffc..4a2ee61f70f 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/event/StorageNodeChangedEvent.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/event/QualifiedDataSourceStateEvent.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event;
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -28,7 +28,7 @@ import
org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
*/
@RequiredArgsConstructor
@Getter
-public final class StorageNodeChangedEvent implements GovernanceEvent {
+public final class QualifiedDataSourceStateEvent implements GovernanceEvent {
private final QualifiedDataSource qualifiedDataSource;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriber.java
new file mode 100644
index 00000000000..0444ff27f5c
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriber.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
+import
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
+import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event.QualifiedDataSourceStateEvent;
+
+/**
+ * Qualified data source subscriber.
+ */
+@RequiredArgsConstructor
+public class QualifiedDataSourceSubscriber implements EventSubscriber {
+
+ private final ContextManager contextManager;
+
+ /**
+ * Renew disabled data source names.
+ *
+ * @param event qualified data source state event
+ */
+ @Subscribe
+ public synchronized void renew(final QualifiedDataSourceStateEvent event) {
+ QualifiedDataSource qualifiedDataSource =
event.getQualifiedDataSource();
+ if
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(qualifiedDataSource.getDatabaseName()))
{
+ return;
+ }
+ ShardingSphereDatabase database =
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDataSource.getDatabaseName());
+ for (StaticDataSourceRuleAttribute each :
database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)) {
+ each.updateStatus(qualifiedDataSource,
event.getStatus().getState());
+ }
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
similarity index 97%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
index 1dd359778a6..21ff04409c5 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/internal/QualifiedDataSourceStatusSubscriber.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal;
import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/watcher/QualifiedDataSourceWatcher.java
similarity index 83%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/watcher/QualifiedDataSourceWatcher.java
index aeb4e05fd54..9097c81cb0c 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/watcher/QualifiedDataSourceWatcher.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher;
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.watcher;
import com.google.common.base.Strings;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event.QualifiedDataSourceStateEvent;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
import
org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatus;
@@ -36,9 +36,9 @@ import java.util.Collections;
import java.util.Optional;
/**
- * Qualified data source changed watcher.
+ * Qualified data source watcher.
*/
-public final class QualifiedDataSourceChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
+public final class QualifiedDataSourceWatcher implements
GovernanceWatcher<GovernanceEvent> {
@Override
public Collection<String> getWatchingKeys() {
@@ -57,8 +57,8 @@ public final class QualifiedDataSourceChangedWatcher
implements GovernanceWatche
}
Optional<QualifiedDataSource> qualifiedDataSource =
QualifiedDataSourceNode.extractQualifiedDataSource(event.getKey());
if (qualifiedDataSource.isPresent()) {
- QualifiedDataSourceState status = new
YamlQualifiedDataSourceStatusSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(),
YamlQualifiedDataSourceStatus.class));
- return Optional.of(new
StorageNodeChangedEvent(qualifiedDataSource.get(), status));
+ QualifiedDataSourceState state = new
YamlQualifiedDataSourceStatusSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(),
YamlQualifiedDataSourceStatus.class));
+ return Optional.of(new
QualifiedDataSourceStateEvent(qualifiedDataSource.get(), state));
}
return Optional.empty();
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
index e295c2fda38..11b360f857d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.compute.online.subscriber.ComputeNodeOnlineSubscriber;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.QualifiedDataSourceSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessListChangedSubscriber;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.subsciber.EventSubscriberRegistry;
@@ -40,6 +41,7 @@ public final class ClusterEventSubscriberRegistry extends
EventSubscriberRegistr
new DatabaseChangedSubscriber(contextManager),
new ProcessListChangedSubscriber(contextManager, repository),
new CacheEvictedSubscriber(),
- new ComputeNodeOnlineSubscriber(contextManager));
+ new ComputeNodeOnlineSubscriber(contextManager),
+ new QualifiedDataSourceSubscriber(contextManager));
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index f7eae73b507..226cbca0e90 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -18,16 +18,12 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import com.google.common.eventbus.Subscribe;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
-import
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeInstanceStateChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
/**
* State changed subscriber.
@@ -41,23 +37,6 @@ public final class StateChangedSubscriber implements
EventSubscriber {
this.contextManager = contextManager;
}
- /**
- * Renew disabled data source names.
- *
- * @param event Storage node changed event
- */
- @Subscribe
- public synchronized void renew(final StorageNodeChangedEvent event) {
- QualifiedDataSource qualifiedDataSource =
event.getQualifiedDataSource();
- if
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(qualifiedDataSource.getDatabaseName()))
{
- return;
- }
- ShardingSphereDatabase database =
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDataSource.getDatabaseName());
- for (StaticDataSourceRuleAttribute each :
database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)) {
- each.updateStatus(qualifiedDataSource,
event.getStatus().getStatus());
- }
- }
-
/**
* Renew cluster state.
*
diff --git
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
index 30feb1f9011..95bc0d0c8d1 100644
---
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
@@ -15,7 +15,7 @@
# limitations under the License.
#
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher.QualifiedDataSourceChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.watcher.QualifiedDataSourceWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher.ClusterStateChangedWatcher
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriberTest.java
similarity index 67%
copy from
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
copy to
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriberTest.java
index 46e06cd4ee1..2afaca2611e 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/nodes/storage/subscriber/QualifiedDataSourceSubscriberTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
@@ -28,18 +28,12 @@ import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSou
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
-import org.apache.shardingsphere.infra.state.instance.InstanceState;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeInstanceStateChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event.QualifiedDataSourceStateEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
@@ -53,15 +47,12 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -69,11 +60,9 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
-class StateChangedSubscriberTest {
+class QualifiedDataSourceSubscriberTest {
- private StateChangedSubscriber subscriber;
-
- private ContextManager contextManager;
+ private QualifiedDataSourceSubscriber subscriber;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ShardingSphereDatabase database;
@@ -81,11 +70,22 @@ class StateChangedSubscriberTest {
@BeforeEach
void setUp() throws SQLException {
EventBusContext eventBusContext = new EventBusContext();
- contextManager = new
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter(),
eventBusContext);
+ ContextManager contextManager = new
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter(),
eventBusContext);
contextManager.renewMetaDataContexts(MetaDataContextsFactory.create(contextManager.getPersistServiceFacade().getMetaDataPersistService(),
new ShardingSphereMetaData(createDatabases(),
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(),
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
new ConfigurationProperties(new Properties()))));
- subscriber = new StateChangedSubscriber(contextManager);
+ subscriber = new QualifiedDataSourceSubscriber(contextManager);
+ }
+
+ @Test
+ void assertRenewForDisableStateChanged() {
+ StaticDataSourceRuleAttribute ruleAttribute =
mock(StaticDataSourceRuleAttribute.class);
+
when(database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)).thenReturn(Collections.singleton(ruleAttribute));
+ QualifiedDataSourceStateEvent event = new
QualifiedDataSourceStateEvent(new QualifiedDataSource("db.readwrite_ds.ds_0"),
new QualifiedDataSourceState(DataSourceState.DISABLED));
+ subscriber.renew(event);
+ verify(ruleAttribute).updateStatus(
+ argThat(qualifiedDataSource ->
Objects.equals(event.getQualifiedDataSource(), qualifiedDataSource)),
+ argThat(dataSourceState -> event.getStatus().getState() ==
dataSourceState));
}
private ContextManagerBuilderParameter
createContextManagerBuilderParameter() {
@@ -103,43 +103,4 @@ class StateChangedSubscriberTest {
when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
return Collections.singletonMap("db", database);
}
-
- @Test
- void assertRenewForDisableStateChanged() {
- StaticDataSourceRuleAttribute ruleAttribute =
mock(StaticDataSourceRuleAttribute.class);
-
when(database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)).thenReturn(Collections.singleton(ruleAttribute));
- StorageNodeChangedEvent event = new StorageNodeChangedEvent(new
QualifiedDataSource("db.readwrite_ds.ds_0"), new
QualifiedDataSourceState(DataSourceState.DISABLED));
- subscriber.renew(event);
- verify(ruleAttribute).updateStatus(
- argThat(qualifiedDataSource ->
Objects.equals(event.getQualifiedDataSource(), qualifiedDataSource)),
- argThat(dataSourceState -> event.getStatus().getStatus() ==
dataSourceState));
- }
-
- @Test
- void assertRenewClusterState() {
- ClusterStateEvent mockClusterStateEvent = new
ClusterStateEvent(ClusterState.READ_ONLY);
- subscriber.renew(mockClusterStateEvent);
- assertThat(contextManager.getStateContext().getClusterState(),
is(ClusterState.READ_ONLY));
- }
-
- @Test
- void assertRenewInstanceState() {
- ComputeNodeInstanceStateChangedEvent event = new
ComputeNodeInstanceStateChangedEvent(
-
contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(),
InstanceState.OK.name());
- subscriber.renew(event);
-
assertThat(contextManager.getComputeNodeInstanceContext().getInstance().getState().getCurrentState(),
is(InstanceState.OK));
- }
-
- @Test
- void assertRenewInstanceWorkerIdEvent() {
- subscriber.renew(new
WorkerIdEvent(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(),
0));
-
assertThat(contextManager.getComputeNodeInstanceContext().getInstance().getWorkerId(),
is(0));
- }
-
- @Test
- void assertRenewInstanceLabels() {
- Collection<String> labels = Collections.singletonList("test");
- subscriber.renew(new
LabelsEvent(contextManager.getComputeNodeInstanceContext().getInstance().getMetaData().getId(),
labels));
-
assertThat(contextManager.getComputeNodeInstanceContext().getInstance().getLabels(),
is(labels));
- }
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
index 67a11e39287..4a680d40f28 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStateSubscriberTest.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import
org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.subscriber.internal.QualifiedDataSourceStatusSubscriber;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
import org.junit.jupiter.api.Test;
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcherTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceWatcherTest.java
similarity index 81%
rename from
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcherTest.java
rename to
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceWatcherTest.java
index 79f2463444d..3c504f5364c 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceChangedWatcherTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/QualifiedDataSourceWatcherTest.java
@@ -21,7 +21,8 @@ import
org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.event.QualifiedDataSourceStateEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.nodes.storage.watcher.QualifiedDataSourceWatcher;
import org.junit.jupiter.api.Test;
import java.util.Optional;
@@ -31,35 +32,35 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-class QualifiedDataSourceChangedWatcherTest {
+class QualifiedDataSourceWatcherTest {
@Test
void assertCreateEnabledQualifiedDataSourceChangedEvent() {
- Optional<GovernanceEvent> actual = new
QualifiedDataSourceChangedWatcher().createGovernanceEvent(
+ Optional<GovernanceEvent> actual = new
QualifiedDataSourceWatcher().createGovernanceEvent(
new
DataChangedEvent("/nodes/qualified_data_sources/replica_query_db.readwrite_ds.replica_ds_0",
"status: ENABLED\n", Type.ADDED));
assertTrue(actual.isPresent());
- StorageNodeChangedEvent actualEvent = (StorageNodeChangedEvent)
actual.get();
+ QualifiedDataSourceStateEvent actualEvent =
(QualifiedDataSourceStateEvent) actual.get();
assertThat(actualEvent.getQualifiedDataSource().getDatabaseName(),
is("replica_query_db"));
assertThat(actualEvent.getQualifiedDataSource().getGroupName(),
is("readwrite_ds"));
assertThat(actualEvent.getQualifiedDataSource().getDataSourceName(),
is("replica_ds_0"));
- assertThat(actualEvent.getStatus().getStatus(),
is(DataSourceState.ENABLED));
+ assertThat(actualEvent.getStatus().getState(),
is(DataSourceState.ENABLED));
}
@Test
void assertCreateDisabledQualifiedDataSourceChangedEvent() {
- Optional<GovernanceEvent> actual = new
QualifiedDataSourceChangedWatcher().createGovernanceEvent(
+ Optional<GovernanceEvent> actual = new
QualifiedDataSourceWatcher().createGovernanceEvent(
new
DataChangedEvent("/nodes/qualified_data_sources/replica_query_db.readwrite_ds.replica_ds_0",
"status: DISABLED\n", Type.DELETED));
assertTrue(actual.isPresent());
- StorageNodeChangedEvent actualEvent = (StorageNodeChangedEvent)
actual.get();
+ QualifiedDataSourceStateEvent actualEvent =
(QualifiedDataSourceStateEvent) actual.get();
assertThat(actualEvent.getQualifiedDataSource().getDatabaseName(),
is("replica_query_db"));
assertThat(actualEvent.getQualifiedDataSource().getGroupName(),
is("readwrite_ds"));
assertThat(actualEvent.getQualifiedDataSource().getDataSourceName(),
is("replica_ds_0"));
- assertThat(actualEvent.getStatus().getStatus(),
is(DataSourceState.DISABLED));
+ assertThat(actualEvent.getStatus().getState(),
is(DataSourceState.DISABLED));
}
@Test
void assertCreateEmptyEvent() {
- Optional<GovernanceEvent> actual = new
QualifiedDataSourceChangedWatcher().createGovernanceEvent(
+ Optional<GovernanceEvent> actual = new
QualifiedDataSourceWatcher().createGovernanceEvent(
new
DataChangedEvent("/nodes/qualified_data_sources/replica_query_db.readwrite_ds.replica_ds_0",
"", Type.ADDED));
assertFalse(actual.isPresent());
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index 46e06cd4ee1..1b10939272d 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -24,12 +24,9 @@ import
org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import
org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.infra.state.instance.InstanceState;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -39,10 +36,8 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeInstanceStateChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -57,14 +52,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -104,17 +96,6 @@ class StateChangedSubscriberTest {
return Collections.singletonMap("db", database);
}
- @Test
- void assertRenewForDisableStateChanged() {
- StaticDataSourceRuleAttribute ruleAttribute =
mock(StaticDataSourceRuleAttribute.class);
-
when(database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)).thenReturn(Collections.singleton(ruleAttribute));
- StorageNodeChangedEvent event = new StorageNodeChangedEvent(new
QualifiedDataSource("db.readwrite_ds.ds_0"), new
QualifiedDataSourceState(DataSourceState.DISABLED));
- subscriber.renew(event);
- verify(ruleAttribute).updateStatus(
- argThat(qualifiedDataSource ->
Objects.equals(event.getQualifiedDataSource(), qualifiedDataSource)),
- argThat(dataSourceState -> event.getStatus().getStatus() ==
dataSourceState));
- }
-
@Test
void assertRenewClusterState() {
ClusterStateEvent mockClusterStateEvent = new
ClusterStateEvent(ClusterState.READ_ONLY);