This is an automated email from the ASF dual-hosted git repository.
kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ef76c0c Add lock center api & implement proxy state queue (#8373)
ef76c0c is described below
commit ef76c0c3bc67c8656819939e5ff9c0c4eb966e8f
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Nov 27 16:26:26 2020 +0800
Add lock center api & implement proxy state queue (#8373)
* Add lock center api
* Add lock center api
---
.../governance/core/facade/GovernanceFacade.java | 5 ++
.../governance/core/lock/LockCenter.java | 96 ++++++++++++++++++++++
.../lock/listener/GlobalLockChangedListener.java | 10 +--
.../node/{GlobalLockNode.java => LockNode.java} | 4 +-
.../governance/core/registry/RegistryCenter.java | 15 ----
.../core/registry/RegistryCenterNodeStatus.java | 7 +-
.../governance/core/state/GovernedState.java | 66 +++++++++++++++
.../{GlobalLockNodeTest.java => LockNodeTest.java} | 8 +-
.../core/registry/RegistryCenterTest.java | 8 --
.../GovernedStateTest.java} | 23 ++++--
10 files changed, 198 insertions(+), 44 deletions(-)
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
index 371e9aa..67b5d19 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/facade/GovernanceFacade.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import org.apache.shardingsphere.governance.core.config.ConfigCenter;
import
org.apache.shardingsphere.governance.core.facade.listener.GovernanceListenerManager;
import
org.apache.shardingsphere.governance.core.facade.repository.GovernanceRepositoryFacade;
+import org.apache.shardingsphere.governance.core.lock.LockCenter;
import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
import org.apache.shardingsphere.governance.core.state.GovernedStateContext;
import
org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
@@ -50,6 +51,9 @@ public final class GovernanceFacade implements AutoCloseable {
private GovernanceListenerManager listenerManager;
+ @Getter
+ private LockCenter lockCenter;
+
/**
* Initialize governance facade.
*
@@ -61,6 +65,7 @@ public final class GovernanceFacade implements AutoCloseable {
repositoryFacade = new GovernanceRepositoryFacade(config);
registryCenter = new
RegistryCenter(repositoryFacade.getRegistryRepository());
configCenter = new
ConfigCenter(repositoryFacade.getConfigurationRepository());
+ lockCenter = new LockCenter(repositoryFacade.getRegistryRepository(),
registryCenter);
listenerManager = new
GovernanceListenerManager(repositoryFacade.getRegistryRepository(),
repositoryFacade.getConfigurationRepository(),
schemaNames.isEmpty() ? configCenter.getAllSchemaNames() : schemaNames);
GovernedStateContext.startUp();
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/LockCenter.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/LockCenter.java
new file mode 100644
index 0000000..41d1013
--- /dev/null
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/LockCenter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.governance.core.lock;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.governance.core.event.GovernanceEventBus;
+import
org.apache.shardingsphere.governance.core.event.model.lock.GlobalLockAddedEvent;
+import org.apache.shardingsphere.governance.core.lock.node.LockNode;
+import org.apache.shardingsphere.governance.core.registry.RegistryCenter;
+import
org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
+import org.apache.shardingsphere.governance.core.state.GovernedState;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Lock center.
+ */
+public final class LockCenter {
+
+ private final RegistryRepository registryRepository;
+
+ private final RegistryCenter registryCenter;
+
+ private final LockNode lockNode;
+
+ private final GovernedState governedState;
+
+ public LockCenter(final RegistryRepository registryRepository, final
RegistryCenter registryCenter) {
+ this.registryRepository = registryRepository;
+ this.registryCenter = registryCenter;
+ this.lockNode = new LockNode();
+ this.governedState = new GovernedState();
+ this.registryRepository.initLock(lockNode.getGlobalLockNodePath());
+ GovernanceEventBus.getInstance().register(this);
+ }
+
+ /**
+ * Lock instance after global lock added.
+ *
+ * @param event global lock added event
+ */
+ @Subscribe
+ public synchronized void lock(final GlobalLockAddedEvent event) {
+ if (Optional.of(event).isPresent()) {
+
registryCenter.persistInstanceData(governedState.addState(RegistryCenterNodeStatus.LOCKED).toString());
+ }
+ }
+
+ /**
+ * Unlock instance.
+ */
+ public void unlock() {
+
registryCenter.persistInstanceData(governedState.recoverState().toString());
+ }
+
+ /**
+ * Try to get global lock.
+ *
+ * @return true if get the lock, false if not
+ */
+ public boolean tryGlobalLock() {
+ // TODO timeout and retry
+ return registryRepository.tryLock(5, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Release global lock.
+ */
+ public void releaseGlobalLock() {
+ registryRepository.releaseLock();
+ }
+
+ /**
+ * Delete global lock.
+ */
+ public void deleteGlobalLock() {
+ registryRepository.delete(lockNode.getGlobalLockNodePath());
+ }
+}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListener.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListener.java
index 9351681..fedacf7 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListener.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/listener/GlobalLockChangedListener.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.governance.core.lock.listener;
import
org.apache.shardingsphere.governance.core.event.listener.PostGovernanceRepositoryEventListener;
import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
import
org.apache.shardingsphere.governance.core.event.model.lock.GlobalLockAddedEvent;
-import org.apache.shardingsphere.governance.core.lock.node.GlobalLockNode;
+import org.apache.shardingsphere.governance.core.lock.node.LockNode;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import
org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
@@ -32,15 +32,15 @@ import java.util.Optional;
*/
public final class GlobalLockChangedListener extends
PostGovernanceRepositoryEventListener<GovernanceEvent> {
- private final GlobalLockNode globalLockNode;
+ private final LockNode lockNode;
public GlobalLockChangedListener(final RegistryRepository
registryRepository) {
- super(registryRepository, Collections.singleton(new
GlobalLockNode().getGlobalLockNodePath()));
- globalLockNode = new GlobalLockNode();
+ super(registryRepository, Collections.singleton(new
LockNode().getGlobalLockNodePath()));
+ lockNode = new LockNode();
}
@Override
protected Optional<GovernanceEvent> createEvent(final DataChangedEvent
event) {
- return event.getKey().equals(globalLockNode.getGlobalLockNodePath()) ?
Optional.of(new GlobalLockAddedEvent()) : Optional.empty();
+ return event.getKey().equals(lockNode.getGlobalLockNodePath()) ?
Optional.of(new GlobalLockAddedEvent()) : Optional.empty();
}
}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/GlobalLockNode.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java
similarity index 95%
rename from
shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/GlobalLockNode.java
rename to
shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java
index ae5045e..ca3c542 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/GlobalLockNode.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/node/LockNode.java
@@ -20,9 +20,9 @@ package org.apache.shardingsphere.governance.core.lock.node;
import com.google.common.base.Joiner;
/**
- * Global lock node.
+ * Lock node.
*/
-public final class GlobalLockNode {
+public final class LockNode {
private static final String GLOBAL_LOCK_NODE = "glock";
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 6951237..a22f0ec 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -18,14 +18,11 @@
package org.apache.shardingsphere.governance.core.registry;
import com.google.common.base.Strings;
-import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.governance.core.event.GovernanceEventBus;
-import
org.apache.shardingsphere.governance.core.event.model.lock.GlobalLockAddedEvent;
import
org.apache.shardingsphere.governance.core.registry.instance.GovernanceInstance;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import java.util.Collection;
-import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -94,16 +91,4 @@ public final class RegistryCenter {
private String getDataSourceNodeData(final String schemaName, final String
dataSourceName) {
return repository.get(node.getDataSourcePath(schemaName,
dataSourceName));
}
-
- /**
- * Lock instance after global lock added.
- *
- * @param event global lock added event
- */
- @Subscribe
- public synchronized void lock(final GlobalLockAddedEvent event) {
- if (Optional.of(event).isPresent()) {
- persistInstanceData(RegistryCenterNodeStatus.LOCKED.toString());
- }
- }
}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeStatus.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeStatus.java
index 4f35613..83c5501 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeStatus.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNodeStatus.java
@@ -30,5 +30,10 @@ public enum RegistryCenterNodeStatus {
/**
* Locked state.
*/
- LOCKED
+ LOCKED,
+
+ /**
+ * Ok state.
+ */
+ OK
}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/state/GovernedState.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/state/GovernedState.java
new file mode 100644
index 0000000..8778b14
--- /dev/null
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/state/GovernedState.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.governance.core.state;
+
+import
org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
+
+import java.util.Deque;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+/**
+ * Governed state.
+ */
+public final class GovernedState {
+
+ private final Deque<RegistryCenterNodeStatus> states = new
ConcurrentLinkedDeque<>();
+
+ public GovernedState() {
+ states.push(RegistryCenterNodeStatus.OK);
+ }
+
+ /**
+ * Add state to stack and return.
+ *
+ * @param state state
+ * @return current state of instance
+ */
+ public RegistryCenterNodeStatus addState(final RegistryCenterNodeStatus
state) {
+ states.push(state);
+ return getState();
+ }
+
+ /**
+ * Get the current state of instance.
+ *
+ * @return state
+ */
+ public RegistryCenterNodeStatus getState() {
+ return Optional.of(states.peek()).orElse(RegistryCenterNodeStatus.OK);
+ }
+
+ /**
+ * Recover state to the previous and return the current state of instance.
+ *
+ * @return current state of instance
+ */
+ public RegistryCenterNodeStatus recoverState() {
+ states.pop();
+ return getState();
+ }
+}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/GlobalLockNodeTest.java
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java
similarity index 84%
copy from
shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/GlobalLockNodeTest.java
copy to
shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java
index 82e5388..61c7250 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/GlobalLockNodeTest.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/LockNodeTest.java
@@ -23,17 +23,17 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-public final class GlobalLockNodeTest {
+public final class LockNodeTest {
- private GlobalLockNode globalLockNode;
+ private LockNode lockNode;
@Before
public void setUp() {
- globalLockNode = new GlobalLockNode();
+ lockNode = new LockNode();
}
@Test
public void getGlobalLockNodePath() {
- assertThat(globalLockNode.getGlobalLockNodePath(), is("/glock"));
+ assertThat(lockNode.getGlobalLockNodePath(), is("/glock"));
}
}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
index 11113d2..ce9aa65 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.governance.core.registry;
-import
org.apache.shardingsphere.governance.core.event.model.lock.GlobalLockAddedEvent;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.junit.Before;
import org.junit.Test;
@@ -30,7 +29,6 @@ import java.util.Collections;
import java.util.List;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -82,10 +80,4 @@ public final class RegistryCenterTest {
verify(registryRepository).getChildrenKeys(anyString());
verify(registryRepository).get(anyString());
}
-
- @Test
- public void assertLock() {
- registryCenter.lock(new GlobalLockAddedEvent());
- verify(registryRepository).persist(anyString(),
eq(RegistryCenterNodeStatus.LOCKED.toString()));
- }
}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/GlobalLockNodeTest.java
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/state/GovernedStateTest.java
similarity index 52%
rename from
shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/GlobalLockNodeTest.java
rename to
shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/state/GovernedStateTest.java
index 82e5388..8950190 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/lock/node/GlobalLockNodeTest.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/test/java/org/apache/shardingsphere/governance/core/state/GovernedStateTest.java
@@ -15,25 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.governance.core.lock.node;
+package org.apache.shardingsphere.governance.core.state;
-import org.junit.Before;
+import
org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-public final class GlobalLockNodeTest {
+public final class GovernedStateTest {
- private GlobalLockNode globalLockNode;
+ private final GovernedState governedState = new GovernedState();
- @Before
- public void setUp() {
- globalLockNode = new GlobalLockNode();
+ @Test
+ public void addState() {
+ assertThat(governedState.getState(), is(RegistryCenterNodeStatus.OK));
+ RegistryCenterNodeStatus state =
governedState.addState(RegistryCenterNodeStatus.LOCKED);
+ assertThat(state, is(RegistryCenterNodeStatus.LOCKED));
}
@Test
- public void getGlobalLockNodePath() {
- assertThat(globalLockNode.getGlobalLockNodePath(), is("/glock"));
+ public void recoverState() {
+ governedState.addState(RegistryCenterNodeStatus.LOCKED);
+ assertThat(governedState.getState(),
is(RegistryCenterNodeStatus.LOCKED));
+ assertThat(governedState.recoverState(),
is(RegistryCenterNodeStatus.OK));
+ assertThat(governedState.getState(), is(RegistryCenterNodeStatus.OK));
}
}