This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b8cc78b251 refactor xa recovery id node of registry center (#17962)
2b8cc78b251 is described below

commit 2b8cc78b251790a3289bfc8201e7ae2ecaf87700
Author: Haoran Meng <[email protected]>
AuthorDate: Thu May 26 14:31:57 2022 +0800

    refactor xa recovery id node of registry center (#17962)
    
    * refactor xa recovery id node of registry center
    
    * refactor xa recovery id node of registry center
---
 .../mode/metadata/persist/node/ComputeNode.java          | 16 +++++++++++++---
 .../persist/service/ComputeNodePersistService.java       | 11 +++++++++--
 .../mode/metadata/persist/node/ComputeNodeTest.java      |  7 ++++++-
 .../persist/service/ComputeNodePersistServiceTest.java   |  5 +++--
 .../compute/watcher/ComputeNodeStateChangedWatcher.java  | 12 ++++++++++--
 .../watcher/ComputeNodeStateChangedWatcherTest.java      | 10 ++++++++++
 6 files changed, 51 insertions(+), 10 deletions(-)

diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 9a44e8d0d6d..88e33bd6194 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -105,14 +105,24 @@ public final class ComputeNode {
         return String.join("/", "", ROOT_NODE, COMPUTE_NODE, LABELS_NODE, 
instanceId);
     }
     
+    /**
+     * Get xa recovery id node path.
+     * 
+     * @return xa recovery id node path
+     */
+    public static String getXaRecoveryIdNodePath() {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, 
XA_RECOVERY_ID_NODE);
+    }
+    
     /**
      * Get compute node xa recovery id path.
      *
+     * @param xaRecoveryId recovery id
      * @param instanceId instance id
      * @return path of compute node xa recovery id
      */
-    public static String getInstanceXaRecoveryIdNodePath(final String 
instanceId) {
-        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, 
XA_RECOVERY_ID_NODE, instanceId);
+    public static String getInstanceXaRecoveryIdNodePath(final String 
xaRecoveryId, final String instanceId) {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, 
XA_RECOVERY_ID_NODE, xaRecoveryId, instanceId);
     }
     
     /**
@@ -141,7 +151,7 @@ public final class ComputeNode {
      * @return instance id
      */
     public static String getInstanceIdByComputeNode(final String 
computeNodePath) {
-        Pattern pattern = Pattern.compile(getComputeNodePath() + 
"(/status|/worker_id|/labels|/xa_recovery_id)" + "/([\\S]+)$", 
Pattern.CASE_INSENSITIVE);
+        Pattern pattern = Pattern.compile(getComputeNodePath() + 
"(/status|/worker_id|/labels)" + "/([\\S]+)$", Pattern.CASE_INSENSITIVE);
         Matcher matcher = pattern.matcher(computeNodePath);
         return matcher.find() ? matcher.group(2) : "";
     }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
index 20f6a29c261..1892b6f8601 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.mode.persist.PersistRepository;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -86,7 +87,7 @@ public final class ComputeNodePersistService {
      * @param xaRecoveryId xa recovery id
      */
     public void persistInstanceXaRecoveryId(final String instanceId, final 
String xaRecoveryId) {
-        
repository.persist(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId), 
xaRecoveryId);
+        
repository.persist(ComputeNode.getInstanceXaRecoveryIdNodePath(xaRecoveryId, 
instanceId), "");
     }
     
     /**
@@ -136,7 +137,13 @@ public final class ComputeNodePersistService {
      * @return xa recovery id
      */
     public Optional<String> loadXaRecoveryId(final String instanceId) {
-        return 
Optional.ofNullable(repository.get(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId)));
+        List<String> xaRecoveryIds = 
repository.getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+        for (String xaRecoveryId : xaRecoveryIds) {
+            if (repository.getChildrenKeys(String.join("/", 
ComputeNode.getXaRecoveryIdNodePath(), xaRecoveryId)).contains(instanceId)) {
+                return Optional.of(xaRecoveryId);
+            }
+        }
+        return Optional.empty();
     }
     
     /**
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
index 9fce9798fa0..0e9fafc99e6 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
@@ -78,7 +78,12 @@ public final class ComputeNodeTest {
     
     @Test
     public void assertGetInstanceXaRecoveryIdNodePath() {
-        
assertThat(ComputeNode.getInstanceXaRecoveryIdNodePath("127.0.0.1@3307"), 
is("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307"));
+        
assertThat(ComputeNode.getInstanceXaRecoveryIdNodePath("127.0.0.1@3307", 
"127.0.0.1@3307"), 
is("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307/127.0.0.1@3307"));
+    }
+    
+    @Test
+    public void assertGetXaRecoveryIdNodePath() {
+        assertThat(ComputeNode.getXaRecoveryIdNodePath(), 
is("/nodes/compute_nodes/xa_recovery_id"));
     }
     
     @Test
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
index d83ecb188b7..a2ddff8c327 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
@@ -67,7 +67,7 @@ public final class ComputeNodePersistServiceTest {
         InstanceDefinition instanceDefinition = new 
InstanceDefinition(InstanceType.PROXY, 3307);
         final String instanceId = instanceDefinition.getInstanceId().getId();
         new 
ComputeNodePersistService(repository).persistInstanceXaRecoveryId(instanceId, 
instanceId);
-        
verify(repository).persist(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId),
 instanceId);
+        
verify(repository).persist(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId,
 instanceId), "");
     }
     
     @Test
@@ -99,7 +99,8 @@ public final class ComputeNodePersistServiceTest {
         InstanceDefinition instanceDefinition = new 
InstanceDefinition(InstanceType.PROXY, 3307);
         final String instanceId = instanceDefinition.getInstanceId().getId();
         new ComputeNodePersistService(repository).loadXaRecoveryId(instanceId);
-        
verify(repository).get(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId));
+        
verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+        
     }
     
     @Test
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
 [...]
index 64fa96ebc7b..537d2bf4763 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -68,14 +68,14 @@ public final class ComputeNodeStateChangedWatcher 
implements GovernanceWatcher<G
                 return Optional.of(new WorkerIdEvent(instanceId, 
Strings.isNullOrEmpty(event.getValue()) ? null : 
Long.valueOf(event.getValue())));
             } else if 
(event.getKey().equals(ComputeNode.getInstanceLabelsNodePath(instanceId))) {
                 return Optional.of(new LabelsEvent(instanceId, 
Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : 
YamlEngine.unmarshal(event.getValue(), Collection.class)));
-            } else if 
(event.getKey().equals(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId)))
 {
-                return Optional.of(new XaRecoveryIdEvent(instanceId, 
Strings.isNullOrEmpty(event.getValue()) ? null : event.getValue()));
             }
         } else if 
(event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
             Optional<InstanceDefinition> instanceDefinition = 
ComputeNode.getInstanceDefinitionByInstanceOnlinePath(event.getKey());
             return instanceDefinition.isPresent() ? 
createInstanceEvent(instanceDefinition.get(), event.getType()) : 
Optional.empty();
         } else if 
(event.getKey().startsWith(ComputeNode.getProcessTriggerNodePatch())) {
             return createShowProcessListTriggerEvent(event);
+        } else if 
(event.getKey().startsWith(ComputeNode.getXaRecoveryIdNodePath())) {
+            return createXaRecoveryIdEvent(event);
         }
         return Optional.empty();
     }
@@ -106,4 +106,12 @@ public final class ComputeNodeStateChangedWatcher 
implements GovernanceWatcher<G
         }
         return Optional.empty();
     }
+    
+    private Optional<GovernanceEvent> createXaRecoveryIdEvent(final 
DataChangedEvent event) {
+        Matcher matcher = 
Pattern.compile(ComputeNode.getXaRecoveryIdNodePath() + "/([\\S]+)/([\\S]+)$", 
Pattern.CASE_INSENSITIVE).matcher(event.getKey());
+        if (matcher.find()) {
+            return Optional.of(new XaRecoveryIdEvent(matcher.group(2), 
matcher.group(1)));
+        }
+        return Optional.empty();
+    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState
 [...]
index a5504b71e8a..fb5cfc86f31 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import org.junit.Test;
@@ -91,4 +92,13 @@ public final class ComputeNodeStateChangedWatcherTest {
         assertThat(((LabelsEvent) actual.get()).getLabels(), 
is(Arrays.asList("label_1", "label_2")));
         assertThat(((LabelsEvent) actual.get()).getInstanceId(), 
is("127.0.0.1@3307"));
     }
+    
+    @Test
+    public void assertCreateXaRecoveryIdEvent() {
+        Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
+                .createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307/127.0.0.1@3307",
 "", Type.ADDED));
+        assertTrue(actual.isPresent());
+        assertThat(((XaRecoveryIdEvent) actual.get()).getInstanceId(), 
is("127.0.0.1@3307"));
+        assertThat(((XaRecoveryIdEvent) actual.get()).getXaRecoveryId(), 
is("127.0.0.1@3307"));
+    }
 }

Reply via email to