This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c16084f7fb7 Remove event bus for lock and unlock cluster (#31323)
c16084f7fb7 is described below

commit c16084f7fb7f9b96a5a1263ec05c4ae31579d2f4
Author: Haoran Meng <[email protected]>
AuthorDate: Tue May 21 14:22:16 2024 +0800

    Remove event bus for lock and unlock cluster (#31323)
---
 .../shardingsphere/mode/state/StateContext.java    |  9 +++++
 .../shardingsphere/mode/state/StateService.java    |  4 +-
 .../mode/state/StateContextTest.java               |  7 ++++
 .../mode/state/StateServiceTest.java               | 10 -----
 .../cluster/ClusterContextManagerBuilder.java      |  2 -
 .../cluster/subscriber/ClusterStateSubscriber.java | 44 ----------------------
 .../subscriber/ClusterStateSubscriberTest.java     | 44 ----------------------
 .../ral/updatable/UnlockClusterExecutor.java       |  3 +-
 .../lock/impl/ClusterReadWriteLockStrategy.java    |  3 +-
 .../lock/impl/ClusterWriteLockStrategy.java        |  3 +-
 10 files changed, 20 insertions(+), 109 deletions(-)

diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java
index 0e0371242fa..ca0f39c2333 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateContext.java
@@ -50,4 +50,13 @@ public final class StateContext {
     public void switchCurrentClusterState(final ClusterState state) {
         currentClusterState.set(state);
     }
+    
+    /**
+     * Update cluster state.
+     *
+     * @param state cluster state
+     */
+    public void updateClusterState(final ClusterState state) {
+        stateService.persist(state);
+    }
 }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java
index b9eca600be4..818bff9a36d 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/state/StateService.java
@@ -39,9 +39,7 @@ public final class StateService {
      * @param state cluster state
      */
     public void persist(final ClusterState state) {
-        if 
(Strings.isNullOrEmpty(repository.getDirectly(ComputeNode.getClusterStateNodePath())))
 {
-            repository.persist(ComputeNode.getClusterStateNodePath(), 
state.name());
-        }
+        repository.persist(ComputeNode.getClusterStateNodePath(), 
state.name());
     }
     
     /**
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java
index 21ba69898eb..1eddf34024f 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateContextTest.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 class StateContextTest {
     
@@ -39,4 +40,10 @@ class StateContextTest {
         stateContext.switchCurrentClusterState(ClusterState.UNAVAILABLE);
         assertThat(stateContext.getCurrentClusterState(), 
is(ClusterState.UNAVAILABLE));
     }
+    
+    @Test
+    void assertUpdateClusterState() {
+        stateContext.updateClusterState(ClusterState.OK);
+        verify(stateContext.getStateService()).persist(ClusterState.OK);
+    }
 }
diff --git 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java
 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java
index 354674cb9e9..05c13bc5d7c 100644
--- 
a/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java
+++ 
b/mode/core/src/test/java/org/apache/shardingsphere/mode/state/StateServiceTest.java
@@ -25,9 +25,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
-import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
 
 @ExtendWith(MockitoExtension.class)
 class StateServiceTest {
@@ -42,14 +40,6 @@ class StateServiceTest {
         verify(repository).persist(ComputeNode.getClusterStateNodePath(), 
ClusterState.OK.name());
     }
     
-    @Test
-    void assertPersistClusterStateWithPath() {
-        StateService stateService = new StateService(repository);
-        
when(repository.getDirectly("/nodes/compute_nodes/status")).thenReturn(ClusterState.OK.name());
-        stateService.persist(ClusterState.OK);
-        verify(repository, 
times(0)).persist(ComputeNode.getClusterStateNodePath(), 
ClusterState.OK.name());
-    }
-    
     @Test
     void assertLoadClusterState() {
         new StateService(repository).load();
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 79fd611638e..b478531f2a3 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -37,7 +37,6 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLoc
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber.ClusterStateSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber.ComputeNodeStatusSubscriber;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.QualifiedDataSourceStatusSubscriber;
@@ -106,7 +105,6 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     // TODO remove the method, only keep ZooKeeper's events, remove all 
decouple events
     private void createSubscribers(final EventBusContext eventBusContext, 
final ClusterPersistRepository repository) {
         eventBusContext.register(new ComputeNodeStatusSubscriber(repository));
-        eventBusContext.register(new ClusterStateSubscriber(repository));
         eventBusContext.register(new 
QualifiedDataSourceStatusSubscriber(repository));
         eventBusContext.register(new ClusterProcessSubscriber(repository, 
eventBusContext));
         eventBusContext.register(new 
ShardingSphereSchemaDataRegistrySubscriber(repository));
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriber.java
deleted file mode 100644
index 84ee0bb09e3..00000000000
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriber.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;
-
-import com.google.common.eventbus.Subscribe;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-
-/**
- * Cluster state subscriber.
- */
-@RequiredArgsConstructor
-public final class ClusterStateSubscriber implements EventSubscriber {
-    
-    private final ClusterPersistRepository repository;
-    
-    /**
-     * Update cluster status.
-     *
-     * @param event cluster status changed event
-     */
-    @Subscribe
-    public void update(final ClusterStateChangedEvent event) {
-        repository.persist(ComputeNode.getClusterStateNodePath(), 
event.getState().name());
-    }
-}
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriberTest.java
deleted file mode 100644
index 02ddf8c68d4..00000000000
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStateSubscriberTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;
-
-import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static org.mockito.Mockito.verify;
-
-@ExtendWith(MockitoExtension.class)
-class ClusterStateSubscriberTest {
-    
-    @Mock
-    private ClusterPersistRepository repository;
-    
-    @Test
-    void assertUpdate() {
-        ClusterStateSubscriber clusterStateSubscriber = new 
ClusterStateSubscriber(repository);
-        ClusterStateChangedEvent event = new 
ClusterStateChangedEvent(ClusterState.OK);
-        clusterStateSubscriber.update(event);
-        verify(repository).persist(ComputeNode.getClusterStateNodePath(), 
ClusterState.OK.name());
-    }
-}
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java
index 8ed42981911..a6d9764f7bc 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutor.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.infra.state.cluster.ClusterState;
 import org.apache.shardingsphere.mode.exception.NotLockedClusterException;
 import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
 
 /**
  * Unlock cluster executor.
@@ -44,7 +43,7 @@ public final class UnlockClusterExecutor implements 
DistSQLUpdateExecutor<Unlock
         if (lockContext.tryLock(lockDefinition, 3000L)) {
             try {
                 checkState(contextManager);
-                
contextManager.getComputeNodeInstanceContext().getEventBusContext().post(new 
ClusterStateChangedEvent(ClusterState.OK));
+                
contextManager.getStateContext().updateClusterState(ClusterState.OK);
                 // TODO unlock snapshot info if locked
             } finally {
                 lockContext.unlock(lockDefinition);
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java
index cb9dee9d592..6e0bfd629df 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterReadWriteLockStrategy.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.proxy.backend.lock.impl;
 
 import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy;
 
@@ -29,7 +28,7 @@ public class ClusterReadWriteLockStrategy implements 
ClusterLockStrategy {
     
     @Override
     public void lock() {
-        
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new
 ClusterStateChangedEvent(ClusterState.UNAVAILABLE));
+        
ProxyContext.getInstance().getContextManager().getStateContext().updateClusterState(ClusterState.UNAVAILABLE);
     }
     
     @Override
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java
index 48dd3449e21..3bc027c48f1 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/lock/impl/ClusterWriteLockStrategy.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.proxy.backend.lock.impl;
 
 import org.apache.shardingsphere.infra.state.cluster.ClusterState;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateChangedEvent;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy;
 
@@ -29,7 +28,7 @@ public class ClusterWriteLockStrategy implements 
ClusterLockStrategy {
     
     @Override
     public void lock() {
-        
ProxyContext.getInstance().getContextManager().getComputeNodeInstanceContext().getEventBusContext().post(new
 ClusterStateChangedEvent(ClusterState.READ_ONLY));
+        
ProxyContext.getInstance().getContextManager().getStateContext().updateClusterState(ClusterState.READ_ONLY);
     }
     
     @Override

Reply via email to