This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new edbcd78cce0 Refactor ClusterStatusService.loadClusterState() (#31248)
edbcd78cce0 is described below
commit edbcd78cce071b2c50730357d3f69b46a9593d89
Author: Liang Zhang <[email protected]>
AuthorDate: Thu May 16 19:04:42 2024 +0800
Refactor ClusterStatusService.loadClusterState() (#31248)
* Refactor ClusterStatusService.loadClusterState()
* Refactor ClusterStatusService.loadClusterState()
* Refactor ClusterStatusService.loadClusterState()
---
.../shardingsphere/mode/manager/ContextManager.java | 9 +++------
.../mode/manager/ContextManagerTest.java | 2 +-
.../cluster/ClusterContextManagerBuilder.java | 14 ++++++++++----
.../status/cluster/event/ClusterStateEvent.java | 3 ++-
.../cluster/service/ClusterStatusService.java | 13 ++++++++-----
.../cluster/watcher/ClusterStateChangedWatcher.java | 21 +++++++++++++++------
.../subscriber/StateChangedSubscriber.java | 2 +-
.../cluster/service/ClusterStatusServiceTest.java | 6 +++---
.../watcher/ClusterStateChangedWatcherTest.java | 6 +++---
.../subscriber/StateChangedSubscriberTest.java | 2 +-
10 files changed, 47 insertions(+), 31 deletions(-)
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index ebd67de7818..d4e06010307 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -276,13 +276,10 @@ public final class ContextManager implements
AutoCloseable {
/**
* Update cluster state.
*
- * @param status status
+ * @param clusterState cluster state
*/
- public void updateClusterState(final String status) {
- try {
- clusterStateContext.switchState(ClusterState.valueOf(status));
- } catch (final IllegalArgumentException ignore) {
- }
+ public void updateClusterState(final ClusterState clusterState) {
+ clusterStateContext.switchState(clusterState);
}
@Override
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
index 56c43061c4d..85ea02dfa70 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
@@ -266,7 +266,7 @@ class ContextManagerTest {
@Test
void assertUpdateClusterStatus() {
- contextManager.updateClusterState("READ_ONLY");
+ contextManager.updateClusterState(ClusterState.READ_ONLY);
assertThat(contextManager.getClusterStateContext().getCurrentState(),
is(ClusterState.READ_ONLY));
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index f046d7bbcf4..b2e264eb9a5 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
@@ -43,6 +44,7 @@ import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import java.sql.SQLException;
+import java.util.Optional;
/**
* Cluster context manager builder.
@@ -63,6 +65,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
setContextManagerAware(result);
createSubscribers(eventBusContext, repository);
registerOnline(registryCenter, param, result);
+ setClusterStatus(registryCenter, result);
return result;
}
@@ -93,15 +96,18 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
private void registerOnline(final RegistryCenter registryCenter, final
ContextManagerBuilderParameter param, final ContextManager contextManager) {
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
- loadClusterStatus(registryCenter, contextManager);
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
new ClusterEventSubscriberRegistry(contextManager,
registryCenter).register();
}
- private void loadClusterStatus(final RegistryCenter registryCenter, final
ContextManager contextManager) {
-
registryCenter.getClusterStatusService().persistClusterState(contextManager.getClusterStateContext().getCurrentState());
-
contextManager.updateClusterState(registryCenter.getClusterStatusService().loadClusterStatus());
+ private void setClusterStatus(final RegistryCenter registryCenter, final
ContextManager contextManager) {
+ Optional<ClusterState> clusterState =
registryCenter.getClusterStatusService().load();
+ if (clusterState.isPresent()) {
+ contextManager.updateClusterState(clusterState.get());
+ } else {
+ registryCenter.getClusterStatusService().persist(ClusterState.OK);
+ }
}
@Override
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/event/ClusterStateEvent.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/event/ClusterStateEvent.java
index 5a69a8f3b05..1518da048a4 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/event/ClusterStateEvent.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/event/ClusterStateEvent.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+import org.apache.shardingsphere.infra.state.cluster.ClusterState;
/**
* Cluster state event.
@@ -28,5 +29,5 @@ import
org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
@Getter
public final class ClusterStateEvent implements GovernanceEvent {
- private final String status;
+ private final ClusterState clusterState;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java
index 07ede1342aa..19a29f473be 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusService.java
@@ -23,6 +23,8 @@ import
org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import java.util.Optional;
+
/**
* Cluster status service.
*/
@@ -36,18 +38,19 @@ public final class ClusterStatusService {
*
* @param state cluster state
*/
- public void persistClusterState(final ClusterState state) {
+ public void persist(final ClusterState state) {
if
(Strings.isNullOrEmpty(repository.getDirectly(ComputeNode.getClusterStatusNodePath())))
{
repository.persist(ComputeNode.getClusterStatusNodePath(),
state.name());
}
}
/**
- * Load cluster status.
+ * Load cluster state.
*
- * @return cluster status
+ * @return cluster state
*/
- public String loadClusterStatus() {
- return repository.getDirectly(ComputeNode.getClusterStatusNodePath());
+ public Optional<ClusterState> load() {
+ String value =
repository.getDirectly(ComputeNode.getClusterStatusNodePath());
+ return Strings.isNullOrEmpty(value) ? Optional.empty() :
Optional.of(ClusterState.valueOf(value));
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
index 5d60abb8f4b..77dfa65e17f 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
@@ -18,12 +18,13 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher;
import com.google.common.base.Strings;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
+import org.apache.shardingsphere.infra.state.cluster.ClusterState;
+import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
import java.util.Arrays;
import java.util.Collection;
@@ -47,9 +48,17 @@ public final class ClusterStateChangedWatcher implements
GovernanceWatcher<Gover
@Override
public Optional<GovernanceEvent> createGovernanceEvent(final
DataChangedEvent event) {
- String clusterStatus = ComputeNode.getClusterStatusNodePath();
- return Strings.isNullOrEmpty(clusterStatus) || Type.DELETED ==
event.getType() ||
!event.getKey().equals(ComputeNode.getClusterStatusNodePath())
+ String clusterStatusPath = ComputeNode.getClusterStatusNodePath();
+ return Strings.isNullOrEmpty(clusterStatusPath) || Type.DELETED ==
event.getType() ||
!event.getKey().equals(ComputeNode.getClusterStatusNodePath())
? Optional.empty()
- : Optional.of(new ClusterStateEvent(event.getValue()));
+ : Optional.of(new ClusterStateEvent(getClusterState(event)));
+ }
+
+ private ClusterState getClusterState(final DataChangedEvent event) {
+ try {
+ return ClusterState.valueOf(event.getValue());
+ } catch (final IllegalArgumentException ignore) {
+ return ClusterState.OK;
+ }
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index c44a129bfb2..9a87ffb3c92 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -84,7 +84,7 @@ public final class StateChangedSubscriber implements
EventSubscriber {
*/
@Subscribe
public synchronized void renew(final ClusterStateEvent event) {
- contextManager.updateClusterState(event.getStatus());
+ contextManager.updateClusterState(event.getClusterState());
}
/**
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java
index 15b1de83b95..fac728a5e3c 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/service/ClusterStatusServiceTest.java
@@ -38,7 +38,7 @@ class ClusterStatusServiceTest {
@Test
void assertPersistClusterStateWithoutPath() {
ClusterStatusService clusterStatusService = new
ClusterStatusService(repository);
- clusterStatusService.persistClusterState(ClusterState.OK);
+ clusterStatusService.persist(ClusterState.OK);
verify(repository).persist(ComputeNode.getClusterStatusNodePath(),
ClusterState.OK.name());
}
@@ -46,13 +46,13 @@ class ClusterStatusServiceTest {
void assertPersistClusterStateWithPath() {
ClusterStatusService clusterStatusService = new
ClusterStatusService(repository);
when(repository.getDirectly("/nodes/compute_nodes/status")).thenReturn(ClusterState.OK.name());
- clusterStatusService.persistClusterState(ClusterState.OK);
+ clusterStatusService.persist(ClusterState.OK);
verify(repository,
times(0)).persist(ComputeNode.getClusterStatusNodePath(),
ClusterState.OK.name());
}
@Test
void assertLoadClusterStatus() {
- new ClusterStatusService(repository).loadClusterStatus();
+ new ClusterStatusService(repository).load();
verify(repository).getDirectly(ComputeNode.getClusterStatusNodePath());
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcherTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcherTest.java
index 27c462ac970..9d9eaa59698 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcherTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcherTest.java
@@ -37,7 +37,7 @@ class ClusterStateChangedWatcherTest {
Optional<GovernanceEvent> actual = new ClusterStateChangedWatcher()
.createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/status", ClusterState.READ_ONLY.name(),
Type.UPDATED));
assertTrue(actual.isPresent());
- assertThat(((ClusterStateEvent) actual.get()).getStatus(),
is(ClusterState.READ_ONLY.name()));
+ assertThat(((ClusterStateEvent) actual.get()).getClusterState(),
is(ClusterState.READ_ONLY));
}
@Test
@@ -45,7 +45,7 @@ class ClusterStateChangedWatcherTest {
Optional<GovernanceEvent> actual = new ClusterStateChangedWatcher()
.createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/status",
ClusterState.UNAVAILABLE.name(), Type.UPDATED));
assertTrue(actual.isPresent());
- assertThat(((ClusterStateEvent) actual.get()).getStatus(),
is(ClusterState.UNAVAILABLE.name()));
+ assertThat(((ClusterStateEvent) actual.get()).getClusterState(),
is(ClusterState.UNAVAILABLE));
}
@Test
@@ -53,6 +53,6 @@ class ClusterStateChangedWatcherTest {
Optional<GovernanceEvent> actual = new ClusterStateChangedWatcher()
.createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/status", ClusterState.OK.name(),
Type.UPDATED));
assertTrue(actual.isPresent());
- assertThat(((ClusterStateEvent) actual.get()).getStatus(),
is(ClusterState.OK.name()));
+ assertThat(((ClusterStateEvent) actual.get()).getClusterState(),
is(ClusterState.OK));
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index f52d94d8078..2f51892339c 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -130,7 +130,7 @@ class StateChangedSubscriberTest {
@Test
void assertRenewClusterStatus() {
- ClusterStateEvent mockClusterStateEvent = new
ClusterStateEvent("READ_ONLY");
+ ClusterStateEvent mockClusterStateEvent = new
ClusterStateEvent(ClusterState.READ_ONLY);
subscriber.renew(mockClusterStateEvent);
assertThat(contextManager.getClusterStateContext().getCurrentState(),
is(ClusterState.READ_ONLY));
}