This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6b38c87 add event watcher for instance worker id (#14758)
6b38c87 is described below
commit 6b38c877514c333c389cc17646952452ea9e8cf3
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Jan 14 13:09:27 2022 +0800
add event watcher for instance worker id (#14758)
---
.../infra/instance/ComputeNodeInstance.java | 2 +-
.../shardingsphere/infra/instance/InstanceContext.java | 11 +++++++++++
.../mode/metadata/persist/node/ComputeNode.java | 8 ++++----
.../persist/service/ComputeNodePersistService.java | 6 +++---
.../mode/metadata/persist/node/ComputeNodeTest.java | 5 +++--
.../coordinator/ClusterContextManagerCoordinator.java | 15 ++++++++++++++-
.../registry/status/compute/event}/StateEvent.java | 5 +++--
.../registry/status/compute/event/WorkerIdEvent.java | 11 +++++------
.../watcher/ComputeNodeStateChangedWatcher.java | 18 ++++++++++++------
.../watcher/ComputeNodeStateChangedWatcherTest.java | 15 ++++++++-------
10 files changed, 64 insertions(+), 32 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 94876d7..87b9550 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -35,5 +35,5 @@ public final class ComputeNodeInstance {
private Collection<String> status;
- private Integer workerId;
+ private Long workerId;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 0e3fa0f..208adb8 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -51,4 +51,15 @@ public final class InstanceContext {
private void switchInstanceState(final Collection<String> status) {
state.switchState(StateType.CIRCUIT_BREAK, null != status &&
status.contains(StateType.CIRCUIT_BREAK.name()));
}
+
+ /**
+ * Update instance worker id.
+ *
+ * @param workerId worker id
+ */
+ public void updateWorkerId(final Long workerId) {
+ if (workerId != instance.getWorkerId()) {
+ instance.setWorkerId(workerId);
+ }
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 108dac8..c50bf1c 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -94,12 +94,12 @@ public final class ComputeNode {
/**
* Get instance id by status path.
*
- * @param statusPath status path
+ * @param attributesPath attributes path
* @return instance id
*/
- public static String getInstanceIdByStatus(final String statusPath) {
- Pattern pattern = Pattern.compile(getAttributesNodePath() +
"/([\\S]+)/status$", Pattern.CASE_INSENSITIVE);
- Matcher matcher = pattern.matcher(statusPath);
+ public static String getInstanceIdByAttributes(final String
attributesPath) {
+ Pattern pattern = Pattern.compile(getAttributesNodePath() +
"/([\\S]+)" + "(/status|/worker_id)$", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(attributesPath);
return matcher.find() ? matcher.group(1) : "";
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
index 60f9758..7c2f991 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
@@ -57,7 +57,7 @@ public final class ComputeNodePersistService {
* @param instanceId instance id
* @param workerId worker id
*/
- public void persistInstanceWorkerId(final String instanceId, final Integer
workerId) {
+ public void persistInstanceWorkerId(final String instanceId, final Long
workerId) {
repository.persist(ComputeNode.getInstanceWorkerIdNodePath(instanceId),
String.valueOf(workerId));
}
@@ -89,9 +89,9 @@ public final class ComputeNodePersistService {
* @param instanceId instance id
* @return worker id
*/
- public Integer loadInstanceWorkerId(final String instanceId) {
+ public Long loadInstanceWorkerId(final String instanceId) {
try {
- return
Integer.valueOf(repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId)));
+ return
Long.valueOf(repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId)));
} catch (final NumberFormatException ex) {
log.error("Invalid worker id for instance: {}", instanceId);
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
index 97104c97..5bdf58d 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNodeTest.java
@@ -53,8 +53,9 @@ public final class ComputeNodeTest {
}
@Test
- public void assertGetInstanceIdByStatus() {
-
assertThat(ComputeNode.getInstanceIdByStatus("/nodes/compute_nodes/attributes/127.0.0.1@3307/status"),
is("127.0.0.1@3307"));
+ public void assertGetInstanceIdByAttributes() {
+
assertThat(ComputeNode.getInstanceIdByAttributes("/nodes/compute_nodes/attributes/127.0.0.1@3307/status"),
is("127.0.0.1@3307"));
+
assertThat(ComputeNode.getInstanceIdByAttributes("/nodes/compute_nodes/attributes/127.0.0.1@3308/worker_id"),
is("127.0.0.1@3308"));
}
@Test
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index a37965e..fc38086 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
import
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
import
org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
-import org.apache.shardingsphere.infra.state.StateEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.authority.event.AuthorityChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.SchemaChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.DisabledStateChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -184,6 +185,18 @@ public final class ClusterContextManagerCoordinator {
}
}
+ /**
+ * Renew instance worker id.
+ *
+ * @param event worker id event
+ */
+ @Subscribe
+ public synchronized void renew(final WorkerIdEvent event) {
+ if
(contextManager.getInstanceContext().getInstance().getInstanceDefinition().getInstanceId().getId().equals(event.getInstanceId()))
{
+
contextManager.getInstanceContext().updateWorkerId(event.getWorkerId());
+ }
+ }
+
private void persistSchema(final String schemaName) {
if
(!metaDataPersistService.getDataSourceService().isExisted(schemaName)) {
metaDataPersistService.getDataSourceService().persist(schemaName,
new LinkedHashMap<>());
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
similarity index 80%
copy from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
copy to
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
index d2f9dff..79e81a3 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/StateEvent.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.state;
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import java.util.Collection;
@@ -27,7 +28,7 @@ import java.util.Collection;
*/
@RequiredArgsConstructor
@Getter
-public final class StateEvent {
+public final class StateEvent implements GovernanceEvent {
private final String instanceId;
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
similarity index 75%
rename from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
rename to
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
index d2f9dff..ce35ca5 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/state/StateEvent.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/WorkerIdEvent.java
@@ -15,21 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.state;
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-
-import java.util.Collection;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * State event.
+ * Worker id changed event.
*/
@RequiredArgsConstructor
@Getter
-public final class StateEvent {
+public final class WorkerIdEvent implements GovernanceEvent {
private final String instanceId;
- private final Collection<String> status;
+ private final Long workerId;
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
[...]
index 03dde36..3a58d88 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -18,9 +18,11 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
import com.google.common.base.Strings;
-import org.apache.shardingsphere.infra.state.StateEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -34,7 +36,7 @@ import java.util.Optional;
/**
* Compute node state changed watcher.
*/
-public final class ComputeNodeStateChangedWatcher implements
GovernanceWatcher<StateEvent> {
+public final class ComputeNodeStateChangedWatcher implements
GovernanceWatcher<GovernanceEvent> {
@Override
public Collection<String> getWatchingKeys() {
@@ -47,11 +49,15 @@ public final class ComputeNodeStateChangedWatcher
implements GovernanceWatcher<S
}
@Override
- public Optional<StateEvent> createGovernanceEvent(final DataChangedEvent
event) {
- String instanceId = ComputeNode.getInstanceIdByStatus(event.getKey());
+ public Optional<GovernanceEvent> createGovernanceEvent(final
DataChangedEvent event) {
+ String instanceId =
ComputeNode.getInstanceIdByAttributes(event.getKey());
if (!Strings.isNullOrEmpty(instanceId)) {
- Collection<String> status =
Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() :
YamlEngine.unmarshal(event.getValue(), Collection.class);
- return Optional.of(new StateEvent(instanceId, status));
+ if
(event.getKey().equals(ComputeNode.getInstanceStatusNodePath(instanceId))) {
+ Collection<String> status =
Strings.isNullOrEmpty(event.getValue()) ? new ArrayList<>() :
YamlEngine.unmarshal(event.getValue(), Collection.class);
+ return Optional.of(new StateEvent(instanceId, status));
+ } else if
(event.getKey().equals(ComputeNode.getInstanceWorkerIdNodePath(instanceId))) {
+ return Optional.of(new WorkerIdEvent(instanceId,
Strings.isNullOrEmpty(event.getValue()) ? null :
Long.valueOf(event.getValue())));
+ }
}
return Optional.empty();
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState
[...]
index c12c494..032003a 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -17,7 +17,8 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
-import org.apache.shardingsphere.infra.state.StateEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -35,18 +36,18 @@ public final class ComputeNodeStateChangedWatcherTest {
@Test
public void assertCreateEventWhenEnabled() {
- Optional<StateEvent> actual = new
ComputeNodeStateChangedWatcher().createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status",
+ Optional<GovernanceEvent> actual = new
ComputeNodeStateChangedWatcher().createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status",
YamlEngine.marshal(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAK.name())),
Type.ADDED));
assertTrue(actual.isPresent());
- assertThat(actual.get().getStatus(),
is(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAK.name())));
- assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
+ assertThat(((StateEvent) actual.get()).getStatus(),
is(Arrays.asList(ComputeNodeStatus.CIRCUIT_BREAK.name())));
+ assertThat(((StateEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
}
@Test
public void assertCreateEventWhenDisabled() {
- Optional<StateEvent> actual = new
ComputeNodeStateChangedWatcher().createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", "",
Type.UPDATED));
+ Optional<GovernanceEvent> actual = new
ComputeNodeStateChangedWatcher().createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/attributes/127.0.0.1@3307/status", "",
Type.UPDATED));
assertTrue(actual.isPresent());
- assertTrue(actual.get().getStatus().isEmpty());
- assertThat(actual.get().getInstanceId(), is("127.0.0.1@3307"));
+ assertTrue(((StateEvent) actual.get()).getStatus().isEmpty());
+ assertThat(((StateEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
}
}