This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b38c87  add event watcher for instance worker id (#14758)
6b38c87 is described below

commit 6b38c877514c333c389cc17646952452ea9e8cf3
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Jan 14 13:09:27 2022 +0800

    add event watcher for instance worker id (#14758)
---
 .../infra/instance/ComputeNodeInstance.java            |  2 +-
 .../shardingsphere/infra/instance/InstanceContext.java | 11 +++++++++++
 .../mode/metadata/persist/node/ComputeNode.java        |  8 ++++----
 .../persist/service/ComputeNodePersistService.java     |  6 +++---
 .../mode/metadata/persist/node/ComputeNodeTest.java    |  5 +++--
 .../coordinator/ClusterContextManagerCoordinator.java  | 15 ++++++++++++++-
 .../registry/status/compute/event}/StateEvent.java     |  5 +++--
 .../registry/status/compute/event/WorkerIdEvent.java   | 11 +++++------
 .../watcher/ComputeNodeStateChangedWatcher.java        | 18 ++++++++++++------
 .../watcher/ComputeNodeStateChangedWatcherTest.java    | 15 ++++++++-------
 10 files changed, 64 insertions(+), 32 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 94876d7..87b9550 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -35,5 +35,5 @@ public final class ComputeNodeInstance {
     
     private Collection<String> status;
     
-    private Integer workerId;
+    private Long workerId;
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 0e3fa0f..208adb8 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -51,4 +51,15 @@ public final class InstanceContext {
     private void switchInstanceState(final Collection<String> status) {
         state.switchState(StateType.CIRCUIT_BREAK, null != status && 
status.contains(StateType.CIRCUIT_BREAK.name()));
     }
+    
+    /**
+     * Update instance worker id.
+     * 
+     * @param workerId worker id
+     */
+    public void updateWorkerId(final Long workerId) {
+        if (workerId != instance.getWorkerId()) {
+            instance.setWorkerId(workerId);
+        }
+    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 108dac8..c50bf1c 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -94,12 +94,12 @@ public final class ComputeNode {
     /**
      * Get instance id by status path.
      * 
-     * @param statusPath status path
+     * @param attributesPath attributes path
      * @return instance id
      */
-    public static String getInstanceIdByStatus(final String statusPath) {
-        Pattern pattern = Pattern.compile(getAttributesNodePath() + 
"/([\\S]+)/status$", Pattern.CASE_INSENSITIVE);
-        Matcher matcher = pattern.matcher(statusPath);
+    public static String getInstanceIdByAttributes(final String 
attributesPath) {
+        Pattern pattern = Pattern.compile(getAttributesNodePath() + 
"/([\\S]+)" + "(/status|/worker_id)$", Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(attributesPath);
         return matcher.find() ? matcher.group(1) : "";
     }
     
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
index 60f9758..7c2f991 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
@@ -57,7 +57,7 @@ public final class ComputeNodePersistService {
      * @param instanceId instance id
      * @param workerId worker id
      */
-    public void persistInstanceWorkerId(final String instanceId, final Integer 
workerId) {
+    public void persistInstanceWorkerId(final String instanceId, final Long 
workerId) {
         
repository.persist(ComputeNode.getInstanceWorkerIdNodePath(instanceId), 
String.valueOf(workerId));
     }
     
@@ -89,9 +89,9 @@ public final class ComputeNodePersistService {
      * @param instanceId instance id
      * @return worker id
      */
-    public Integer loadInstanceWorkerId(final String instanceId) {
+    public Long loadInstanceWorkerId(final String instanceId) {
         try {
-            return 
Integer.valueOf(repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId)));
+            return 
Long.valueOf(repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId)));
         } catch (final NumberFormatException ex) {
             log.error("Invalid worker id for instance: {}", instanceId);
         }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
index 97104c97..5bdf58d 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
@@ -53,8 +53,9 @@ public final class ComputeNodeTest {
     }
     
     @Test
-    public void assertGetInstanceIdByStatus() {
-        
assertThat(ComputeNode.getInstanceIdByStatus("/nodes/compute_nodes/attributes/127.0.0.1@3307/status"),
 is("127.0.0.1@3307"));
+    public void assertGetInstanceIdByAttributes() {
+        
assertThat(ComputeNode.getInstanceIdByAttributes("/nodes/compute_nodes/attributes/127.0.0.1@3307/status"),
 is("127.0.0.1@3307"));
+        
assertThat(ComputeNode.getInstanceIdByAttributes("/nodes/compute_nodes/attributes/127.0.0.1@3308/worker_id"),
 is("127.0.0.1@3308"));
     }
     
     @Test
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index a37965e..fc38086 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
 import 
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
 import 
org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
-import org.apache.shardingsphere.infra.state.StateEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.authority.event.AuthorityChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
@@ -34,6 +34,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.DisabledStateChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -184,6 +185,18 @@ public final class ClusterContextManagerCoordinator {
         }
     }
     
+    /**
+     * Renew instance worker id.
+     *
+     * @param event worker id event
+     */
+    @Subscribe
+    public synchronized void renew(final WorkerIdEvent event) {
+        if 
(contextManager.getInstanceContext().getInstance().getInstanceDefinition().getInstanceId().getId().equals(event.getInstanceId()))
 {
+            
contextManager.getInstanceContext().updateWorkerId(event.getWorkerId());
+        }
+    }
+    
     private void persistSchema(final String schemaName) {
         if 
(!metaDataPersistService.getDataSourceService().isExisted(schemaName)) {
             metaDataPersistService.getDataSourceService().persist(schemaName, 
new LinkedHashMap<>());
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
similarity index 80%
copy from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
copy to 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
index d2f9dff..79e81a3 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.state;
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 import java.util.Collection;
 
@@ -27,7 +28,7 @@ import java.util.Collection;
  */
 @RequiredArgsConstructor
 @Getter
-public final class StateEvent {
+public final class StateEvent implements GovernanceEvent {
     
     private final String instanceId;
     
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
similarity index 75%
rename from 
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
rename to 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
index d2f9dff..ce35ca5 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
@@ -15,21 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.infra.state;
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-
-import java.util.Collection;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * State event.
+ * Worker id changed event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class StateEvent {
+public final class WorkerIdEvent implements GovernanceEvent {
     
     private final String instanceId;
     
-    private final Collection<String> status;
+    private final Long workerId; 
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
 [...]
index 03dde36..3a58d88 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -18,9 +18,11 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
 
 import com.google.common.base.Strings;
-import org.apache.shardingsphere.infra.state.StateEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -34,7 +36,7 @@ import java.util.Optional;
 /**
  * Compute node state changed watcher.
  */
-public final class ComputeNodeStateChangedWatcher implements 
GovernanceWatcher<StateEvent> {
+public final class ComputeNodeStateChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
     public Collection<String> getWatchingKeys() {
@@ -47,11 +49,15 @@ public final class ComputeNodeStateChangedWatcher 
implements GovernanceWatcher<S
     }
     
     @Override
-    public Optional<StateEvent> createGovernanceEvent(final DataChangedEvent 
event) {
-        String instanceId = ComputeNode.getInstanceIdByStatus(event.getKey());
+    public Optional<GovernanceEvent> createGovernanceEvent(final 
DataChangedEvent event) {
+        String instanceId = 
ComputeNode.getInstanceIdByAttributes(event.getKey());
         if (!Strings.isNullOrEmpty(instanceId)) {
-            Collection<String> status = 
Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : 
YamlEngine.unmarshal(event.getValue(), Collection.class);
-            return Optional.of(new StateEvent(instanceId, status));
+            if 
(event.getKey().equals(ComputeNode.getInstanceStatusNodePath(instanceId))) {
+                Collection<String> status = 
Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() : 
YamlEngine.unmarshal(event.getValue(), Collection.class);
+                return Optional.of(new StateEvent(instanceId, status));
+            } else if 
(event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) {
+                return Optional.of(new WorkerIdEvent(instanceId, 
Strings.isNullOrEmpty(event.getValue()) ? null : 
Long.valueOf(event.getValue())));
+            }
         }
         return Optional.empty();
     }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState
 [...]
index c12c494..032003a 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -17,7 +17,8 @@
 
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
 
-import org.apache.shardingsphere.infra.state.StateEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -35,18 +36,18 @@ public final class ComputeNodeStateChangedWatcherTest {
     
     @Test
     public void assertCreateEventWhenEnabled() {
-        Optional<StateEvent> actual = new 
ComputeNodeStateChangedWatcher().createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", 
+        Optional<GovernanceEvent> actual = new 
ComputeNodeStateChangedWatcher().createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", 
                 
YamlEngine.marshal(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAK.name())), 
Type.ADDED));
         assertTrue(actual.isPresent());
-        assertThat(actual.get().getStatus(), 
is(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAK.name())));
-        assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
+        assertThat(((StateEvent) actual.get()).getStatus(), 
is(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAK.name())));
+        assertThat(((StateEvent) actual.get()).getInstanceId(), 
is("127.0.0.1@3307"));
     }
     
     @Test
     public void assertCreateEventWhenDisabled() {
-        Optional<StateEvent> actual = new 
ComputeNodeStateChangedWatcher().createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", "", 
Type.UPDATED));
+        Optional<GovernanceEvent> actual = new 
ComputeNodeStateChangedWatcher().createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", "", 
Type.UPDATED));
         assertTrue(actual.isPresent());
-        assertTrue(actual.get().getStatus().isEmpty());
-        assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
+        assertTrue(((StateEvent) actual.get()).getStatus().isEmpty());
+        assertThat(((StateEvent) actual.get()).getInstanceId(), 
is("127.0.0.1@3307"));
     }
 }

Reply via email to