This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c16084f7fb7 Remove event bus for lock and unlock cluster (#31323)
c16084f7fb7 is described below
commit c16084f7fb7f9b96a5a1263ec05c4ae31579d2f4
Author: Haoran Meng <[email protected]>
AuthorDate: Tue May 21 14:22:16 2024 +0800
Remove event bus for lock and unlock cluster (#31323)
---
.../shardingsphere/mode/state/StateContext.java | 9 +++++
.../shardingsphere/mode/state/StateService.java | 4 +-
.../mode/state/StateContextTest.java | 7 ++++
.../mode/state/StateServiceTest.java | 10 -----
.../cluster/ClusterContextManagerBuilder.java | 2 -
.../cluster/subscriber/ClusterStateSubscriber.java | 44 ----------------------
.../subscriber/ClusterStateSubscriberTest.java | 44 ----------------------
.../ral/updatable/UnlockClusterExecutor.java | 3 +-
.../lock/impl/ClusterReadWriteLockStrategy.java | 3 +-
.../lock/impl/ClusterWriteLockStrategy.java | 3 +-
10 files changed, 20 insertions(+), 109 deletions(-)
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java
index 0e0371242fa..ca0f39c2333 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java
@@ -50,4 +50,13 @@ public final class StateContext {
public void switchCurrentClusterState(final ClusterState state) {
currentClusterState.set(state);
}
+
+ /**
+ * Update cluster state.
+ *
+ * @param state cluster state
+ */
+ public void updateClusterState(final ClusterState state) {
+ stateService.persist(state);
+ }
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java
index b9eca600be4..818bff9a36d 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java
@@ -39,9 +39,7 @@ public final class StateService {
* @param state cluster state
*/
public void persist(final ClusterState state) {
- if
(Strings.isNullOrEmpty(repository.getDirectly(ComputeNode.getClusterStateNodePath())))
{
- repository.persist(ComputeNode.getClusterStateNodePath(),
state.name());
- }
+ repository.persist(ComputeNode.getClusterStateNodePath(),
state.name());
}
/**
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java
index 21ba69898eb..1eddf34024f 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
class StateContextTest {
@@ -39,4 +40,10 @@ class StateContextTest {
stateContext.switchCurrentClusterState(ClusterState.UNAVAILABLE);
assertThat(stateContext.getCurrentClusterState(),
is(ClusterState.UNAVAILABLE));
}
+
+ @Test
+ void assertUpdateClusterState() {
+ stateContext.updateClusterState(ClusterState.OK);
+ verify(stateContext.getStateService()).persist(ClusterState.OK);
+ }
}
diff --git
a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java
b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java
index 354674cb9e9..05c13bc5d7c 100644
---
a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java
+++
b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java
@@ -25,9 +25,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
@ExtendWith(MockitoExtension.class)
class StateServiceTest {
@@ -42,14 +40,6 @@ class StateServiceTest {
verify(repository).persist(ComputeNode.getClusterStateNodePath(),
ClusterState.OK.name());
}
- @Test
- void assertPersistClusterStateWithPath() {
- StateService stateService = new StateService(repository);
-
when(repository.getDirectly("/nodes/compute_nodes/status")).thenReturn(ClusterState.OK.name());
- stateService.persist(ClusterState.OK);
- verify(repository,
times(0)).persist(ComputeNode.getClusterStateNodePath(),
ClusterState.OK.name());
- }
-
@Test
void assertLoadClusterState() {
new StateService(repository).load();
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 79fd611638e..b478531f2a3 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -37,7 +37,6 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLoc
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber.ClusterStateSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
@@ -106,7 +105,6 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
// TODO remove the method, only keep ZooKeeper's events, remove all
decouple events
private void createSubscribers(final EventBusContext eventBusContext,
final ClusterPersistRepository repository) {
eventBusContext.register(new ComputeNodeStatusSubscriber(repository));
- eventBusContext.register(new ClusterStateSubscriber(repository));
eventBusContext.register(new
QualifiedDataSourceStatusSubscriber(repository));
eventBusContext.register(new ClusterProcessSubscriber(repository,
eventBusContext));
eventBusContext.register(new
ShardingSphereSchemaDataRegistrySubscriber(repository));
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriber.java
deleted file mode 100644
index 84ee0bb09e3..00000000000
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriber.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;
-
-import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-
-/**
- * Cluster state subscriber.
- */
-@RequiredArgsConstructor
-public final class ClusterStateSubscriber implements EventSubscriber {
-
- private final ClusterPersistRepository repository;
-
- /**
- * Update cluster status.
- *
- * @param event cluster status changed event
- */
- @Subscribe
- public void update(final ClusterStateChangedEvent event) {
- repository.persist(ComputeNode.getClusterStateNodePath(),
event.getState().name());
- }
-}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriberTest.java
deleted file mode 100644
index 02ddf8c68d4..00000000000
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriberTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;
-
-import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static org.mockito.Mockito.verify;
-
-@ExtendWith(MockitoExtension.class)
-class ClusterStateSubscriberTest {
-
- @Mock
- private ClusterPersistRepository repository;
-
- @Test
- void assertUpdate() {
- ClusterStateSubscriber clusterStateSubscriber = new
ClusterStateSubscriber(repository);
- ClusterStateChangedEvent event = new
ClusterStateChangedEvent(ClusterState.OK);
- clusterStateSubscriber.update(event);
- verify(repository).persist(ComputeNode.getClusterStateNodePath(),
ClusterState.OK.name());
- }
-}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java
index 8ed42981911..a6d9764f7bc 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.mode.exception.NotLockedClusterException;
import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
/**
* Unlock cluster executor.
@@ -44,7 +43,7 @@ public final class UnlockClusterExecutor implements
DistSQLUpdateExecutor<Unlock
if (lockContext.tryLock(lockDefinition, 3000L)) {
try {
checkState(contextManager);
-
contextManager.getComputeNodeInstanceContext().getEventBusContext().post(new
ClusterStateChangedEvent(ClusterState.OK));
+
contextManager.getStateContext().updateClusterState(ClusterState.OK);
// TODO unlock snapshot info if locked
} finally {
lockContext.unlock(lockDefinition);
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java
index cb9dee9d592..6e0bfd629df 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.proxy.backend.lock.impl;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy;
@@ -29,7 +28,7 @@ public class ClusterReadWriteLockStrategy implements
ClusterLockStrategy {
@Override
public void lock() {
-
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new
ClusterStateChangedEvent(ClusterState.UNAVAILABLE));
+
ProxyContext.getInstance().getContextManager().getStateContext().updateClusterState(ClusterState.UNAVAILABLE);
}
@Override
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java
index 48dd3449e21..3bc027c48f1 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.proxy.backend.lock.impl;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy;
@@ -29,7 +28,7 @@ public class ClusterWriteLockStrategy implements
ClusterLockStrategy {
@Override
public void lock() {
-
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new
ClusterStateChangedEvent(ClusterState.READ_ONLY));
+
ProxyContext.getInstance().getContextManager().getStateContext().updateClusterState(ClusterState.READ_ONLY);
}
@Override