This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 774dad4b282 Refactor ComputeNodeStatusService.registerOnline() (#31301)
774dad4b282 is described below
commit 774dad4b28271b718bfefea68a8ab804a7088a65
Author: Liang Zhang <[email protected]>
AuthorDate: Sun May 19 19:30:50 2024 +0800
Refactor ComputeNodeStatusService.registerOnline() (#31301)
* Refactor ShowTablesExecutor
* Refactor ComputeNodeStatusService.registerOnline()
* Refactor ComputeNodeStatusService.registerOnline()
---
.../infra/instance/ComputeNodeInstance.java | 9 +++-----
.../infra/instance/InstanceContext.java | 22 +++++++++---------
.../DriverDatabaseConnectionManager.java | 2 +-
.../DriverDatabaseConnectionManagerTest.java | 2 +-
.../traffic/engine/TrafficEngine.java | 2 +-
.../algorithm/engine/TrafficEngineTest.java | 4 ++--
.../cluster/ClusterContextManagerBuilder.java | 7 ++----
.../compute/service/ComputeNodeStatusService.java | 26 ++++++++--------------
.../service/ComputeNodeStatusServiceTest.java | 19 +++++-----------
.../subscriber/StateChangedSubscriberTest.java | 12 +++++-----
.../SessionConnectionReconnectListener.java | 8 +------
.../ral/queryable/ShowComputeNodesExecutor.java | 8 +++----
.../queryable/ShowComputeNodesExecutorTest.java | 2 +-
13 files changed, 47 insertions(+), 76 deletions(-)
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 8112f222e5b..6c16f75b64c 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.infra.instance;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
@@ -29,6 +30,7 @@ import java.util.LinkedList;
/**
* Instance of compute node.
*/
+@RequiredArgsConstructor
@Getter
public final class ComputeNodeInstance {
@@ -39,12 +41,7 @@ public final class ComputeNodeInstance {
private Collection<String> labels = new LinkedList<>();
@Setter
- private volatile int workerId;
-
- public ComputeNodeInstance(final InstanceMetaData metaData) {
- this.metaData = metaData;
- workerId = -1;
- }
+ private volatile int workerId = -1;
/**
* Set labels.
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 7bd7c3be30e..d36e9fe74da 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -56,7 +56,7 @@ public final class InstanceContext {
private final EventBusContext eventBusContext;
- private final Collection<ComputeNodeInstance> allClusterInstances = new
LinkedList<>();
+ private final Collection<ComputeNodeInstance>
allClusterComputeNodeInstances = new LinkedList<>();
/**
* Update instance status.
@@ -72,7 +72,7 @@ public final class InstanceContext {
}
private void updateRelatedComputeNodeInstancesStatus(final String
instanceId, final String status) {
- for (ComputeNodeInstance each : allClusterInstances) {
+ for (ComputeNodeInstance each : allClusterComputeNodeInstances) {
if (each.getMetaData().getId().equals(instanceId)) {
each.switchState(status);
}
@@ -89,7 +89,7 @@ public final class InstanceContext {
if (instance.getMetaData().getId().equals(instanceId)) {
instance.setWorkerId(workerId);
}
- allClusterInstances.stream().filter(each ->
each.getMetaData().getId().equals(instanceId)).forEach(each ->
each.setWorkerId(workerId));
+ allClusterComputeNodeInstances.stream().filter(each ->
each.getMetaData().getId().equals(instanceId)).forEach(each ->
each.setWorkerId(workerId));
}
/**
@@ -102,7 +102,7 @@ public final class InstanceContext {
if (instance.getMetaData().getId().equals(instanceId)) {
instance.setLabels(labels);
}
- allClusterInstances.stream().filter(each ->
each.getMetaData().getId().equals(instanceId)).forEach(each ->
each.setLabels(labels));
+ allClusterComputeNodeInstances.stream().filter(each ->
each.getMetaData().getId().equals(instanceId)).forEach(each ->
each.setLabels(labels));
}
/**
@@ -132,8 +132,8 @@ public final class InstanceContext {
* @param instance compute node instance
*/
public void addComputeNodeInstance(final ComputeNodeInstance instance) {
- allClusterInstances.removeIf(each ->
each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId()));
- allClusterInstances.add(instance);
+ allClusterComputeNodeInstances.removeIf(each ->
each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId()));
+ allClusterComputeNodeInstances.add(instance);
}
/**
@@ -142,7 +142,7 @@ public final class InstanceContext {
* @param instance compute node instance
*/
public void deleteComputeNodeInstance(final ComputeNodeInstance instance) {
- allClusterInstances.removeIf(each ->
each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId()));
+ allClusterComputeNodeInstances.removeIf(each ->
each.getMetaData().getId().equalsIgnoreCase(instance.getMetaData().getId()));
}
/**
@@ -152,9 +152,9 @@ public final class InstanceContext {
* @param labels collection of contained label
* @return compute node instances
*/
- public Map<String, InstanceMetaData> getAllClusterInstances(final
InstanceType instanceType, final Collection<String> labels) {
- Map<String, InstanceMetaData> result = new
LinkedHashMap<>(allClusterInstances.size(), 1F);
- for (ComputeNodeInstance each : allClusterInstances) {
+ public Map<String, InstanceMetaData>
getAllClusterComputeNodeInstances(final InstanceType instanceType, final
Collection<String> labels) {
+ Map<String, InstanceMetaData> result = new
LinkedHashMap<>(allClusterComputeNodeInstances.size(), 1F);
+ for (ComputeNodeInstance each : allClusterComputeNodeInstances) {
if (each.getMetaData().getType() == instanceType &&
labels.stream().anyMatch(((Collection<String>) each.getLabels())::contains)) {
result.put(each.getMetaData().getId(), each.getMetaData());
}
@@ -169,7 +169,7 @@ public final class InstanceContext {
* @return compute node instance
*/
public Optional<ComputeNodeInstance> getComputeNodeInstanceById(final
String instanceId) {
- return allClusterInstances.stream().filter(each ->
instanceId.equals(each.getMetaData().getId())).findFirst();
+ return allClusterComputeNodeInstances.stream().filter(each ->
instanceId.equals(each.getMetaData().getId())).findFirst();
}
/**
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java
index ee3ed656142..a5090d7dde0 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManager.java
@@ -116,7 +116,7 @@ public final class DriverDatabaseConnectionManager
implements OnlineDatabaseConn
DataSourcePoolProperties propsSample =
propsMap.values().iterator().next();
Collection<ShardingSphereUser> users =
contextManager.getMetaDataContexts().getMetaData()
.getGlobalRuleMetaData().getSingleRule(AuthorityRule.class).getConfiguration().getUsers();
- Collection<InstanceMetaData> instances =
contextManager.getInstanceContext().getAllClusterInstances(InstanceType.PROXY,
rule.getLabels()).values();
+ Collection<InstanceMetaData> instances =
contextManager.getInstanceContext().getAllClusterComputeNodeInstances(InstanceType.PROXY,
rule.getLabels()).values();
return
DataSourcePoolCreator.create(createDataSourcePoolPropertiesMap(instances,
users, propsSample, actualDatabaseName), true);
}
diff --git
a/jdbc/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManagerTest.java
b/jdbc/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManagerTest.java
index e1d7251ac4f..bf26316c1ec 100644
---
a/jdbc/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManagerTest.java
+++
b/jdbc/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/DriverDatabaseConnectionManagerTest.java
@@ -82,7 +82,7 @@ class DriverDatabaseConnectionManagerTest {
when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(
new RuleMetaData(Arrays.asList(mock(AuthorityRule.class,
RETURNS_DEEP_STUBS), mock(TransactionRule.class, RETURNS_DEEP_STUBS),
mock(TrafficRule.class, RETURNS_DEEP_STUBS))));
-
when(result.getInstanceContext().getAllClusterInstances(InstanceType.PROXY,
Arrays.asList("OLTP", "OLAP"))).thenReturn(
+
when(result.getInstanceContext().getAllClusterComputeNodeInstances(InstanceType.PROXY,
Arrays.asList("OLTP", "OLAP"))).thenReturn(
Collections.singletonMap("foo_id", new
ProxyInstanceMetaData("foo_id", "127.0.0.1@3307", "foo_version")));
Map<String, DataSource> trafficDataSourceMap =
mockTrafficDataSourceMap();
when(DataSourcePoolCreator.create(any(),
eq(true))).thenReturn(trafficDataSourceMap);
diff --git
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
index 3f28496745c..1c2672cae87 100644
---
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
+++
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
@@ -52,7 +52,7 @@ public final class TrafficEngine {
if (!strategyRule.isPresent() ||
isInvalidStrategyRule(strategyRule.get())) {
return Optional.empty();
}
- Map<String, InstanceMetaData> instances =
instanceContext.getAllClusterInstances(InstanceType.PROXY,
strategyRule.get().getLabels());
+ Map<String, InstanceMetaData> instances =
instanceContext.getAllClusterComputeNodeInstances(InstanceType.PROXY,
strategyRule.get().getLabels());
if (!instances.isEmpty()) {
LoadBalanceAlgorithm loadBalancer =
strategyRule.get().getLoadBalancer();
String instanceId = 1 == instances.size() ?
instances.keySet().iterator().next() :
loadBalancer.getTargetName(strategyRule.get().getName(), new
ArrayList<>(instances.keySet()));
diff --git
a/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/algorithm/engine/TrafficEngineTest.java
b/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/algorithm/engine/TrafficEngineTest.java
index 0c9b806fee6..8156468a6ca 100644
---
a/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/algorithm/engine/TrafficEngineTest.java
+++
b/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/algorithm/engine/TrafficEngineTest.java
@@ -96,13 +96,13 @@ class TrafficEngineTest {
when(loadBalancer.getTargetName("traffic", new
ArrayList<>(instanceIds.keySet()))).thenReturn("foo_id");
when(strategyRule.getLoadBalancer()).thenReturn(loadBalancer);
when(strategyRule.getName()).thenReturn("traffic");
- when(instanceContext.getAllClusterInstances(InstanceType.PROXY,
Arrays.asList("OLTP", "OLAP"))).thenReturn(instanceIds);
+
when(instanceContext.getAllClusterComputeNodeInstances(InstanceType.PROXY,
Arrays.asList("OLTP", "OLAP"))).thenReturn(instanceIds);
Optional<String> actual = trafficEngine.dispatch(queryContext, false);
assertThat(actual, is(Optional.of("foo_id")));
}
private Map<String, InstanceMetaData> mockComputeNodeInstances() {
- Map<String, InstanceMetaData> result = new HashMap<>();
+ Map<String, InstanceMetaData> result = new HashMap<>(2, 1F);
result.put("foo_id", new ProxyInstanceMetaData("foo_id",
"127.0.0.1@3307", "foo_version"));
result.put("bar_id", new ProxyInstanceMetaData("bar_id",
"127.0.0.1@3308", "foo_version"));
return result;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index bc27480d83e..eeb1ad28c93 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -114,14 +114,11 @@ public final class ClusterContextManagerBuilder
implements ContextManagerBuilder
private void registerOnline(final EventBusContext eventBusContext, final
InstanceContext instanceContext,
final ClusterPersistRepository repository,
final ContextManagerBuilderParameter param, final ContextManager
contextManager) {
- ComputeNodeStatusService computeNodeStatusService = new
ComputeNodeStatusService(repository);
-
computeNodeStatusService.registerOnline(instanceContext.getInstance().getMetaData());
-
computeNodeStatusService.persistInstanceLabels(instanceContext.getInstance().getMetaData().getId(),
instanceContext.getInstance().getLabels());
-
computeNodeStatusService.persistInstanceState(instanceContext.getInstance().getMetaData().getId(),
instanceContext.getInstance().getState());
+ new
ComputeNodeStatusService(repository).registerOnline(instanceContext.getInstance());
new GovernanceWatcherFactory(repository,
eventBusContext, param.getInstanceMetaData() instanceof
JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() :
Collections.emptyList()).watchListeners();
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
-
contextManager.getInstanceContext().getAllClusterInstances().addAll(new
ComputeNodeStatusService(repository).loadAllComputeNodeInstances());
+
contextManager.getInstanceContext().getAllClusterComputeNodeInstances().addAll(new
ComputeNodeStatusService(repository).loadAllComputeNodeInstances());
new ClusterEventSubscriberRegistry(contextManager,
repository).register();
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index a32f544d43d..4f6fc8249fc 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -25,7 +25,6 @@ import
org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import
org.apache.shardingsphere.infra.instance.metadata.InstanceMetaDataFactory;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -45,20 +44,23 @@ public final class ComputeNodeStatusService {
private final ClusterPersistRepository repository;
/**
- * Register online.
+ * Register compute node online.
*
- * @param instanceMetaData instance definition
+ * @param computeNodeInstance compute node instance
*/
- public void registerOnline(final InstanceMetaData instanceMetaData) {
-
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceMetaData.getId(),
instanceMetaData.getType()),
- YamlEngine.marshal(new
ComputeNodeData(instanceMetaData.getAttributes(),
instanceMetaData.getVersion())));
+ public void registerOnline(final ComputeNodeInstance computeNodeInstance) {
+ String instanceId = computeNodeInstance.getMetaData().getId();
+
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceId,
computeNodeInstance.getMetaData().getType()),
+ YamlEngine.marshal(new
ComputeNodeData(computeNodeInstance.getMetaData().getAttributes(),
computeNodeInstance.getMetaData().getVersion())));
+
repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(instanceId),
computeNodeInstance.getState().getCurrentState().name());
+ persistInstanceLabels(instanceId, computeNodeInstance.getLabels());
}
/**
* Persist instance labels.
*
* @param instanceId instance id
- * @param labels collection of label
+ * @param labels instance labels
*/
public void persistInstanceLabels(final String instanceId, final
Collection<String> labels) {
if (null != labels) {
@@ -66,16 +68,6 @@ public final class ComputeNodeStatusService {
}
}
- /**
- * Persist instance state.
- *
- * @param instanceId instance id
- * @param state state context
- */
- public void persistInstanceState(final String instanceId, final
InstanceStateContext state) {
-
repository.persistEphemeral(ComputeNode.getInstanceStatusNodePath(instanceId),
state.getCurrentState().name());
- }
-
/**
* Persist instance worker id.
*
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
index 32f98de4c14..62459833c2d 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -23,7 +23,6 @@ import
org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.instance.util.IpUtils;
-import org.apache.shardingsphere.infra.state.instance.InstanceStateContext;
import org.apache.shardingsphere.infra.state.instance.InstanceState;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
@@ -55,9 +54,12 @@ class ComputeNodeStatusServiceTest {
@Test
void assertRegisterOnline() {
- InstanceMetaData instanceMetaData = new
ProxyInstanceMetaData("foo_instance_id", 3307);
- new
ComputeNodeStatusService(repository).registerOnline(instanceMetaData);
-
verify(repository).persistEphemeral(eq("/nodes/compute_nodes/online/proxy/" +
instanceMetaData.getId()), anyString());
+ ComputeNodeInstance computeNodeInstance = new ComputeNodeInstance(new
ProxyInstanceMetaData("foo_instance_id", 3307));
+ computeNodeInstance.setLabels(Collections.singletonList("test"));
+ new
ComputeNodeStatusService(repository).registerOnline(computeNodeInstance);
+
verify(repository).persistEphemeral(eq("/nodes/compute_nodes/online/proxy/" +
computeNodeInstance.getMetaData().getId()), anyString());
+
verify(repository).persistEphemeral(ComputeNode.getInstanceStatusNodePath(computeNodeInstance.getMetaData().getId()),
InstanceState.OK.name());
+
verify(repository).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(computeNodeInstance.getMetaData().getId()),
YamlEngine.marshal(Collections.singletonList("test")));
}
@Test
@@ -71,15 +73,6 @@ class ComputeNodeStatusServiceTest {
verify(repository).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId),
YamlEngine.marshal(Collections.emptyList()));
}
- @Test
- void assertPersistInstanceState() {
- ComputeNodeStatusService computeNodeStatusService = new
ComputeNodeStatusService(repository);
- InstanceMetaData instanceMetaData = new
ProxyInstanceMetaData("foo_instance_id", 3307);
- final String instanceId = instanceMetaData.getId();
- computeNodeStatusService.persistInstanceState(instanceId, new
InstanceStateContext());
-
verify(repository).persistEphemeral(ComputeNode.getInstanceStatusNodePath(instanceId),
InstanceState.OK.name());
- }
-
@Test
void assertPersistInstanceWorkerId() {
InstanceMetaData instanceMetaData = new
ProxyInstanceMetaData("foo_instance_id", 3307);
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index 8541f12ba34..adac8980c0b 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -165,15 +165,15 @@ class StateChangedSubscriberTest {
InstanceMetaData instanceMetaData1 = new
ProxyInstanceMetaData("foo_instance_3307", 3307);
InstanceOnlineEvent instanceOnlineEvent1 = new
InstanceOnlineEvent(instanceMetaData1);
subscriber.renew(instanceOnlineEvent1);
-
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(),
is(1));
- assertThat(((LinkedList<ComputeNodeInstance>)
contextManager.getInstanceContext().getAllClusterInstances()).get(0).getMetaData(),
is(instanceMetaData1));
+
assertThat(contextManager.getInstanceContext().getAllClusterComputeNodeInstances().size(),
is(1));
+ assertThat(((LinkedList<ComputeNodeInstance>)
contextManager.getInstanceContext().getAllClusterComputeNodeInstances()).get(0).getMetaData(),
is(instanceMetaData1));
InstanceMetaData instanceMetaData2 = new
ProxyInstanceMetaData("foo_instance_3308", 3308);
InstanceOnlineEvent instanceOnlineEvent2 = new
InstanceOnlineEvent(instanceMetaData2);
subscriber.renew(instanceOnlineEvent2);
-
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(),
is(2));
- assertThat(((LinkedList<ComputeNodeInstance>)
contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(),
is(instanceMetaData2));
+
assertThat(contextManager.getInstanceContext().getAllClusterComputeNodeInstances().size(),
is(2));
+ assertThat(((LinkedList<ComputeNodeInstance>)
contextManager.getInstanceContext().getAllClusterComputeNodeInstances()).get(1).getMetaData(),
is(instanceMetaData2));
subscriber.renew(instanceOnlineEvent1);
-
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(),
is(2));
- assertThat(((LinkedList<ComputeNodeInstance>)
contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(),
is(instanceMetaData1));
+
assertThat(contextManager.getInstanceContext().getAllClusterComputeNodeInstances().size(),
is(2));
+ assertThat(((LinkedList<ComputeNodeInstance>)
contextManager.getInstanceContext().getAllClusterComputeNodeInstances()).get(1).getMetaData(),
is(instanceMetaData1));
}
}
diff --git
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
index bbba6569415..d12c3883c52 100644
---
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
+++
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/listener/SessionConnectionReconnectListener.java
@@ -63,7 +63,7 @@ public final class SessionConnectionReconnectListener
implements ConnectionState
if (isNeedGenerateWorkerId()) {
instanceContext.generateWorkerId(new Properties());
}
- reRegisterInstanceComputeNode();
+
computeNodeStatusService.registerOnline(instanceContext.getInstance());
return true;
}
sleepInterval();
@@ -78,12 +78,6 @@ public final class SessionConnectionReconnectListener
implements ConnectionState
return -1 != instanceContext.getInstance().getWorkerId();
}
- private void reRegisterInstanceComputeNode() {
-
computeNodeStatusService.registerOnline(instanceContext.getInstance().getMetaData());
-
computeNodeStatusService.persistInstanceLabels(instanceContext.getInstance().getMetaData().getId(),
instanceContext.getInstance().getLabels());
-
computeNodeStatusService.persistInstanceState(instanceContext.getInstance().getMetaData().getId(),
instanceContext.getInstance().getState());
- }
-
@SneakyThrows(InterruptedException.class)
private void sleepInterval() {
Thread.sleep(RECONNECT_INTERVAL_SECONDS * 1000L);
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java
index 80ba0529a2e..ec25cf6f129 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutor.java
@@ -43,11 +43,9 @@ public final class ShowComputeNodesExecutor implements
DistSQLQueryExecutor<Show
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowComputeNodesStatement sqlStatement, final ContextManager contextManager) {
String modeType =
contextManager.getInstanceContext().getModeConfiguration().getType();
- if ("Standalone".equals(modeType)) {
- return
Collections.singleton(buildRow(contextManager.getInstanceContext().getInstance(),
modeType));
- }
- Collection<ComputeNodeInstance> instances =
contextManager.getInstanceContext().getAllClusterInstances();
- return instances.stream().map(each -> buildRow(each,
modeType)).collect(Collectors.toList());
+ return "Standalone".equals(modeType)
+ ?
Collections.singleton(buildRow(contextManager.getInstanceContext().getInstance(),
modeType))
+ :
contextManager.getInstanceContext().getAllClusterComputeNodeInstances().stream().map(each
-> buildRow(each, modeType)).collect(Collectors.toList());
}
private LocalDataQueryResultRow buildRow(final ComputeNodeInstance
instance, final String modeType) {
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java
index 5efd35cd1e0..5d1b1845dbd 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowComputeNodesExecutorTest.java
@@ -97,7 +97,7 @@ class ShowComputeNodesExecutorTest {
when(computeNodeInstance.getMetaData()).thenReturn(new
ProxyInstanceMetaData("foo", "127.0.0.1@3309", "foo_version"));
when(computeNodeInstance.getState()).thenReturn(new
InstanceStateContext());
when(computeNodeInstance.getWorkerId()).thenReturn(1);
-
when(result.getAllClusterInstances()).thenReturn(Collections.singleton(computeNodeInstance));
+
when(result.getAllClusterComputeNodeInstances()).thenReturn(Collections.singleton(computeNodeInstance));
return result;
}
}