This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2b8cc78b251 refactor xa recovery id node of registry center (#17962)
2b8cc78b251 is described below
commit 2b8cc78b251790a3289bfc8201e7ae2ecaf87700
Author: Haoran Meng <[email protected]>
AuthorDate: Thu May 26 14:31:57 2022 +0800
refactor xa recovery id node of registry center (#17962)
* refactor xa recovery id node of registry center
* refactor xa recovery id node of registry center
---
.../mode/metadata/persist/node/ComputeNode.java | 16 +++++++++++++---
.../persist/service/ComputeNodePersistService.java | 11 +++++++++--
.../mode/metadata/persist/node/ComputeNodeTest.java | 7 ++++++-
.../persist/service/ComputeNodePersistServiceTest.java | 5 +++--
.../compute/watcher/ComputeNodeStateChangedWatcher.java | 12 ++++++++++--
.../watcher/ComputeNodeStateChangedWatcherTest.java | 10 ++++++++++
6 files changed, 51 insertions(+), 10 deletions(-)
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 9a44e8d0d6d..88e33bd6194 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -105,14 +105,24 @@ public final class ComputeNode {
return String.join("/", "", ROOT_NODE, COMPUTE_NODE, LABELS_NODE,
instanceId);
}
+ /**
+ * Get xa recovery id node path.
+ *
+ * @return xa recovery id node path
+ */
+ public static String getXaRecoveryIdNodePath() {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE,
XA_RECOVERY_ID_NODE);
+ }
+
/**
* Get compute node xa recovery id path.
*
+ * @param xaRecoveryId recovery id
* @param instanceId instance id
* @return path of compute node xa recovery id
*/
- public static String getInstanceXaRecoveryIdNodePath(final String
instanceId) {
- return String.join("/", "", ROOT_NODE, COMPUTE_NODE,
XA_RECOVERY_ID_NODE, instanceId);
+ public static String getInstanceXaRecoveryIdNodePath(final String
xaRecoveryId, final String instanceId) {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE,
XA_RECOVERY_ID_NODE, xaRecoveryId, instanceId);
}
/**
@@ -141,7 +151,7 @@ public final class ComputeNode {
* @return instance id
*/
public static String getInstanceIdByComputeNode(final String
computeNodePath) {
- Pattern pattern = Pattern.compile(getComputeNodePath() +
"(/status|/worker_id|/labels|/xa_recovery_id)" + "/([\\S]+)$",
Pattern.CASE_INSENSITIVE);
+ Pattern pattern = Pattern.compile(getComputeNodePath() +
"(/status|/worker_id|/labels)" + "/([\\S]+)$", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(computeNodePath);
return matcher.find() ? matcher.group(2) : "";
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
index 20f6a29c261..1892b6f8601 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.mode.persist.PersistRepository;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Optional;
/**
@@ -86,7 +87,7 @@ public final class ComputeNodePersistService {
* @param xaRecoveryId xa recovery id
*/
public void persistInstanceXaRecoveryId(final String instanceId, final
String xaRecoveryId) {
-
repository.persist(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId),
xaRecoveryId);
+
repository.persist(ComputeNode.getInstanceXaRecoveryIdNodePath(xaRecoveryId,
instanceId), "");
}
/**
@@ -136,7 +137,13 @@ public final class ComputeNodePersistService {
* @return xa recovery id
*/
public Optional<String> loadXaRecoveryId(final String instanceId) {
- return
Optional.ofNullable(repository.get(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId)));
+ List<String> xaRecoveryIds =
repository.getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+ for (String xaRecoveryId : xaRecoveryIds) {
+ if (repository.getChildrenKeys(String.join("/",
ComputeNode.getXaRecoveryIdNodePath(), xaRecoveryId)).contains(instanceId)) {
+ return Optional.of(xaRecoveryId);
+ }
+ }
+ return Optional.empty();
}
/**
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
index 9fce9798fa0..0e9fafc99e6 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
@@ -78,7 +78,12 @@ public final class ComputeNodeTest {
@Test
public void assertGetInstanceXaRecoveryIdNodePath() {
-
assertThat(ComputeNode.getInstanceXaRecoveryIdNodePath("127.0.0.1@3307"),
is("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307"));
+
assertThat(ComputeNode.getInstanceXaRecoveryIdNodePath("127.0.0.1@3307",
"127.0.0.1@3307"),
is("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307/127.0.0.1@3307"));
+ }
+
+ @Test
+ public void assertGetXaRecoveryIdNodePath() {
+ assertThat(ComputeNode.getXaRecoveryIdNodePath(),
is("/nodes/compute_nodes/xa_recovery_id"));
}
@Test
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
index d83ecb188b7..a2ddff8c327 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
@@ -67,7 +67,7 @@ public final class ComputeNodePersistServiceTest {
InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
final String instanceId = instanceDefinition.getInstanceId().getId();
new
ComputeNodePersistService(repository).persistInstanceXaRecoveryId(instanceId,
instanceId);
-
verify(repository).persist(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId),
instanceId);
+
verify(repository).persist(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId,
instanceId), "");
}
@Test
@@ -99,7 +99,8 @@ public final class ComputeNodePersistServiceTest {
InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
final String instanceId = instanceDefinition.getInstanceId().getId();
new ComputeNodePersistService(repository).loadXaRecoveryId(instanceId);
-
verify(repository).get(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId));
+
verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+
}
@Test
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
[...]
index 64fa96ebc7b..537d2bf4763 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -68,14 +68,14 @@ public final class ComputeNodeStateChangedWatcher
implements GovernanceWatcher<G
return Optional.of(new WorkerIdEvent(instanceId,
Strings.isNullOrEmpty(event.getValue()) ? null :
Long.valueOf(event.getValue())));
} else if
(event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId))) {
return Optional.of(new LabelsEvent(instanceId,
Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() :
YamlEngine.unmarshal(event.getValue(), Collection.class)));
- } else if
(event.getKey().equals(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId)))
{
- return Optional.of(new XaRecoveryIdEvent(instanceId,
Strings.isNullOrEmpty(event.getValue()) ? null : event.getValue()));
}
} else if
(event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
Optional<InstanceDefinition> instanceDefinition =
ComputeNode.getInstanceDefinitionByInstanceOnlinePath(event.getKey());
return instanceDefinition.isPresent() ?
createInstanceEvent(instanceDefinition.get(), event.getType()) :
Optional.empty();
} else if
(event.getKey().startsWith(ComputeNode.getProcessTriggerNodePatch())) {
return createShowProcessListTriggerEvent(event);
+ } else if
(event.getKey().startsWith(ComputeNode.getXaRecoveryIdNodePath())) {
+ return createXaRecoveryIdEvent(event);
}
return Optional.empty();
}
@@ -106,4 +106,12 @@ public final class ComputeNodeStateChangedWatcher
implements GovernanceWatcher<G
}
return Optional.empty();
}
+
+ private Optional<GovernanceEvent> createXaRecoveryIdEvent(final
DataChangedEvent event) {
+ Matcher matcher =
Pattern.compile(ComputeNode.getXaRecoveryIdNodePath() + "/([\\S]+)/([\\S]+)$",
Pattern.CASE_INSENSITIVE).matcher(event.getKey());
+ if (matcher.find()) {
+ return Optional.of(new XaRecoveryIdEvent(matcher.group(2),
matcher.group(1)));
+ }
+ return Optional.empty();
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState
[...]
index a5504b71e8a..fb5cfc86f31 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.Test;
@@ -91,4 +92,13 @@ public final class ComputeNodeStateChangedWatcherTest {
assertThat(((LabelsEvent) actual.get()).getLabels(),
is(Arrays.asList("label_1", "label_2")));
assertThat(((LabelsEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
}
+
+ @Test
+ public void assertCreateXaRecoveryIdEvent() {
+ Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
+ .createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307/127.0.0.1@3307",
"", Type.ADDED));
+ assertTrue(actual.isPresent());
+ assertThat(((XaRecoveryIdEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
+ assertThat(((XaRecoveryIdEvent) actual.get()).getXaRecoveryId(),
is("127.0.0.1@3307"));
+ }
}