This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bc1be3f8918 refactor all compute status node to ephemeral (#17980)
bc1be3f8918 is described below
commit bc1be3f891824293c817910c9e5c08ddc7ff7820
Author: Haoran Meng <[email protected]>
AuthorDate: Thu May 26 22:06:46 2022 +0800
refactor all compute status node to ephemeral (#17980)
---
.../infra/instance/InstanceContext.java | 10 ++
.../src/test/resources/logback-test.xml | 2 +-
.../metadata/persist/MetaDataPersistService.java | 19 +--
.../persist/service/ComputeNodePersistService.java | 178 ---------------------
.../persist/MetaDataPersistServiceTest.java | 17 +-
.../service/ComputeNodePersistServiceTest.java | 121 --------------
.../cluster/ClusterContextManagerBuilder.java | 29 ++--
.../ClusterContextManagerCoordinator.java | 5 +-
.../cluster/coordinator/RegistryCenter.java | 10 +-
.../compute/service/ComputeNodeStatusService.java | 145 +++++++++++++++++
.../generator/ClusterWorkerIdGenerator.java | 8 +-
.../ClusterContextManagerCoordinatorTest.java | 4 +-
.../service/ComputeNodeStatusServiceTest.java | 86 ++++++++++
.../src/test/resources/logback-test.xml | 2 +-
.../StandaloneContextManagerBuilder.java | 9 +-
.../src/test/resources/logback-test.xml | 2 +-
.../ral/common/updatable/AlterInstanceHandler.java | 16 +-
.../ral/common/updatable/LabelInstanceHandler.java | 17 +-
.../common/updatable/SetInstanceStatusHandler.java | 54 ++-----
.../common/updatable/UnlabelInstanceHandler.java | 22 +--
.../src/test/resources/logback-test.xml | 2 +-
21 files changed, 323 insertions(+), 435 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 3334a29eba5..5d2de4472a5 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -167,6 +167,16 @@ public final class InstanceContext {
return result;
}
+ /**
+ * Get compute node instance by instance id.
+ *
+ * @param instanceId instance id
+ * @return compute node instance
+ */
+ public Optional<ComputeNodeInstance> getComputeNodeInstanceById(final
String instanceId) {
+ return computeNodeInstances.stream().filter(each ->
instanceId.equals(each.getCurrentInstanceId())).findFirst();
+ }
+
/**
* Init lock context.
*/
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/resources/logback-test.xml
b/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/resources/logback-test.xml
index a7207c70eba..064918a0829 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/resources/logback-test.xml
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-spring/shardingsphere-jdbc-core-spring/shardingsphere-jdbc-core-spring-namespace/src/test/resources/logback-test.xml
@@ -26,7 +26,7 @@
<appender-ref ref="console" />
</logger>
<logger
name="org.apache.shardingsphere.schedule.core.api.ModeScheduleContext"
level="error" />
- <logger
name="org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService"
level="off" />
+ <logger
name="org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService"
level="off" />
<root>
<level value="error" />
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
index 8991d107d96..63a27d47825 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
@@ -25,13 +25,12 @@ import
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCre
import
org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
-import
org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService;
-import
org.apache.shardingsphere.mode.metadata.persist.service.SchemaMetaDataPersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.DatabaseVersionPersistService;
+import
org.apache.shardingsphere.mode.metadata.persist.service.SchemaMetaDataPersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.impl.DataSourcePersistService;
+import
org.apache.shardingsphere.mode.metadata.persist.service.impl.DatabaseRulePersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.impl.GlobalRulePersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.impl.PropertiesPersistService;
-import
org.apache.shardingsphere.mode.metadata.persist.service.impl.DatabaseRulePersistService;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import
org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
@@ -61,8 +60,6 @@ public final class MetaDataPersistService {
private final PropertiesPersistService propsService;
- private final ComputeNodePersistService computeNodePersistService;
-
private final DatabaseVersionPersistService databaseVersionPersistService;
public MetaDataPersistService(final PersistRepository repository) {
@@ -72,7 +69,6 @@ public final class MetaDataPersistService {
databaseRulePersistService = new
DatabaseRulePersistService(repository);
globalRuleService = new GlobalRulePersistService(repository);
propsService = new PropertiesPersistService(repository);
- computeNodePersistService = new ComputeNodePersistService(repository);
databaseVersionPersistService = new
DatabaseVersionPersistService(repository);
}
@@ -103,17 +99,6 @@ public final class MetaDataPersistService {
return result;
}
- /**
- * Persist instance labels.
- *
- * @param instanceId instance id
- * @param labels labels
- * @param isOverwrite whether overwrite registry center's configuration if
existed
- */
- public void persistInstanceLabels(final String instanceId, final
Collection<String> labels, final boolean isOverwrite) {
- computeNodePersistService.persistInstanceLabels(instanceId, labels,
isOverwrite);
- }
-
/**
* Get effective data sources.
*
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
deleted file mode 100644
index b89839ebbbc..00000000000
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.metadata.persist.service;
-
-import com.google.common.base.Strings;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.mode.persist.PersistRepository;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Compute node persist service.
- */
-@Slf4j
-@RequiredArgsConstructor
-public final class ComputeNodePersistService {
-
- private final PersistRepository repository;
-
- /**
- * Persist instance labels.
- *
- * @param instanceId instance id
- * @param labels collection of label
- * @param isOverwrite whether overwrite registry center's configuration if
existed
- */
- public void persistInstanceLabels(final String instanceId, final
Collection<String> labels, final boolean isOverwrite) {
- if (null != labels && !labels.isEmpty() && (isOverwrite ||
!isExisted(instanceId))) {
-
repository.persist(ComputeNode.getInstanceLabelsNodePath(instanceId),
YamlEngine.marshal(labels));
- }
- }
-
- /**
- * Delete instance labels.
- *
- * @param instanceId instance id
- */
- public void deleteInstanceLabels(final String instanceId) {
- if (isExisted(instanceId)) {
-
repository.delete(ComputeNode.getInstanceLabelsNodePath(instanceId));
- }
- }
-
- private boolean isExisted(final String instanceId) {
- return
!Strings.isNullOrEmpty(repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId)));
- }
-
- /**
- * Persist instance worker id.
- *
- * @param instanceId instance id
- * @param workerId worker id
- */
- public void persistInstanceWorkerId(final String instanceId, final Long
workerId) {
-
repository.persist(ComputeNode.getInstanceWorkerIdNodePath(instanceId),
String.valueOf(workerId));
- }
-
- /**
- * Persist instance xa recovery id.
- *
- * @param instanceId instance id
- * @param xaRecoveryId xa recovery id
- */
- public void persistInstanceXaRecoveryId(final String instanceId, final
String xaRecoveryId) {
- loadXaRecoveryId(instanceId).ifPresent(each ->
repository.delete(ComputeNode.getInstanceXaRecoveryIdNodePath(each,
instanceId)));
-
repository.persist(ComputeNode.getInstanceXaRecoveryIdNodePath(xaRecoveryId,
instanceId), "");
- }
-
- /**
- * Load instance labels.
- *
- * @param instanceId instance id
- * @return labels
- */
- @SuppressWarnings("unchecked")
- public Collection<String> loadInstanceLabels(final String instanceId) {
- String yamlContent =
repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId));
- return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() :
YamlEngine.unmarshal(yamlContent, Collection.class);
- }
-
- /**
- * Load instance status.
- *
- * @param instanceId instance id
- * @return status
- */
- @SuppressWarnings("unchecked")
- public Collection<String> loadInstanceStatus(final String instanceId) {
- String yamlContent =
repository.get(ComputeNode.getInstanceStatusNodePath(instanceId));
- return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() :
YamlEngine.unmarshal(yamlContent, Collection.class);
- }
-
- /**
- * Load instance worker id.
- *
- * @param instanceId instance id
- * @return worker id
- */
- public Optional<Long> loadInstanceWorkerId(final String instanceId) {
- try {
- String workerId =
repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
- return Strings.isNullOrEmpty(workerId) ? Optional.empty() :
Optional.of(Long.valueOf(workerId));
- } catch (final NumberFormatException ex) {
- log.error("Invalid worker id for instance: {}", instanceId);
- }
- return Optional.empty();
- }
-
- /**
- * Load instance xa recovery id.
- *
- * @param instanceId instance id
- * @return xa recovery id
- */
- public Optional<String> loadXaRecoveryId(final String instanceId) {
- List<String> xaRecoveryIds =
repository.getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
- for (String xaRecoveryId : xaRecoveryIds) {
- if (repository.getChildrenKeys(String.join("/",
ComputeNode.getXaRecoveryIdNodePath(), xaRecoveryId)).contains(instanceId)) {
- return Optional.of(xaRecoveryId);
- }
- }
- return Optional.empty();
- }
-
- /**
- * Load all compute node instances.
- *
- * @return compute node instances
- */
- public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
- Collection<ComputeNodeInstance> result = new ArrayList<>();
- Arrays.stream(InstanceType.values()).forEach(instanceType -> {
- Collection<String> onlineComputeNodes =
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType));
- onlineComputeNodes.forEach(each ->
result.add(loadComputeNodeInstance(new InstanceDefinition(instanceType,
each))));
- });
- return result;
- }
-
- /**
- * Load compute node instance by instance definition.
- *
- * @param instanceDefinition instance definition
- * @return compute node instance
- */
- public ComputeNodeInstance loadComputeNodeInstance(final
InstanceDefinition instanceDefinition) {
- ComputeNodeInstance result = new
ComputeNodeInstance(instanceDefinition);
-
result.setLabels(loadInstanceLabels(instanceDefinition.getInstanceId().getId()));
-
result.switchState(loadInstanceStatus(instanceDefinition.getInstanceId().getId()));
-
loadInstanceWorkerId(instanceDefinition.getInstanceId().getId()).ifPresent(result::setWorkerId);
-
loadXaRecoveryId(instanceDefinition.getInstanceId().getId()).ifPresent(result::setXaRecoveryId);
- return result;
- }
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistServiceTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistServiceTest.java
index 5cb4d668d04..6c0d0102ff7 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistServiceTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistServiceTest.java
@@ -26,11 +26,10 @@ import
org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import
org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.impl.DataSourcePersistService;
+import
org.apache.shardingsphere.mode.metadata.persist.service.impl.DatabaseRulePersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.impl.GlobalRulePersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.impl.PropertiesPersistService;
-import
org.apache.shardingsphere.mode.metadata.persist.service.impl.DatabaseRulePersistService;
import org.apache.shardingsphere.mode.persist.PersistRepository;
import org.apache.shardingsphere.test.mock.MockedDataSource;
import
org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
@@ -52,9 +51,9 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.stream.Collectors;
-import java.util.Properties;
import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -80,9 +79,6 @@ public final class MetaDataPersistServiceTest {
@Mock
private PropertiesPersistService propsService;
- @Mock
- private ComputeNodePersistService computeNodePersistService;
-
private MetaDataPersistService metaDataPersistService;
@Before
@@ -92,7 +88,6 @@ public final class MetaDataPersistServiceTest {
setField("databaseRulePersistService", databaseRulePersistService);
setField("globalRuleService", globalRuleService);
setField("propsService", propsService);
- setField("computeNodePersistService", computeNodePersistService);
}
private void setField(final String name, final Object value) throws
ReflectiveOperationException {
@@ -120,12 +115,6 @@ public final class MetaDataPersistServiceTest {
Collectors.toMap(Entry::getKey, entry ->
DataSourcePropertiesCreator.create(entry.getValue()), (oldValue, currentValue)
-> oldValue, LinkedHashMap::new));
}
- @Test
- public void assertPersistInstanceLabels() {
- metaDataPersistService.persistInstanceLabels("127.0.0.1@3307",
Collections.singletonList("foo_label"), false);
-
verify(computeNodePersistService).persistInstanceLabels("127.0.0.1@3307",
Collections.singletonList("foo_label"), false);
- }
-
private Map<String, DataSource> createDataSourceMap() {
Map<String, DataSource> result = new LinkedHashMap<>(2, 1);
result.put("ds_0", createDataSource("ds_0"));
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
deleted file mode 100644
index eea8cff4d05..00000000000
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistServiceTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.metadata.persist.service;
-
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.mode.persist.PersistRepository;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.util.Collection;
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class ComputeNodePersistServiceTest {
-
- @Mock
- private PersistRepository repository;
-
- @Test
- public void assertPersistInstanceLabels() {
- ComputeNodePersistService computeNodePersistService = new
ComputeNodePersistService(repository);
- InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- computeNodePersistService.persistInstanceLabels(instanceId,
Collections.singletonList("test"), true);
- verify(repository,
times(1)).persist(ComputeNode.getInstanceLabelsNodePath(instanceId),
YamlEngine.marshal(Collections.singletonList("test")));
- computeNodePersistService.persistInstanceLabels(instanceId,
Collections.emptyList(), true);
- verify(repository,
times(0)).persist(ComputeNode.getInstanceLabelsNodePath(instanceId),
YamlEngine.marshal(Collections.emptyList()));
- }
-
- @Test
- public void assertPersistInstanceWorkerId() {
- InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new
ComputeNodePersistService(repository).persistInstanceWorkerId(instanceId, 100L);
-
verify(repository).persist(ComputeNode.getInstanceWorkerIdNodePath(instanceId),
String.valueOf(100L));
- }
-
- @Test
- public void assertPersistInstanceXaRecoveryId() {
- InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new
ComputeNodePersistService(repository).persistInstanceXaRecoveryId(instanceId,
instanceId);
-
verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
-
verify(repository).persist(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId,
instanceId), "");
- }
-
- @Test
- public void assertLoadInstanceLabels() {
- InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new
ComputeNodePersistService(repository).loadInstanceLabels(instanceId);
-
verify(repository).get(ComputeNode.getInstanceLabelsNodePath(instanceId));
- }
-
- @Test
- public void assertLoadInstanceStatus() {
- InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new
ComputeNodePersistService(repository).loadInstanceStatus(instanceId);
-
verify(repository).get(ComputeNode.getInstanceStatusNodePath(instanceId));
- }
-
- @Test
- public void assertLoadInstanceWorkerId() {
- InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new
ComputeNodePersistService(repository).loadInstanceWorkerId(instanceId);
-
verify(repository).get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
- }
-
- @Test
- public void assertLoadInstanceXaRecoveryId() {
- InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
- final String instanceId = instanceDefinition.getInstanceId().getId();
- new ComputeNodePersistService(repository).loadXaRecoveryId(instanceId);
-
verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
-
- }
-
- @Test
- public void assertLoadAllComputeNodeInstances() {
-
when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("127.0.0.1@3307"));
-
when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("127.0.0.1@3308"));
- Collection<ComputeNodeInstance> actual = new
ComputeNodePersistService(repository).loadAllComputeNodeInstances();
- assertThat(actual.size(), is(2));
- }
-
- @Test
- public void assertLoadComputeNodeInstance() {
- InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
- ComputeNodeInstance actual = new
ComputeNodePersistService(repository).loadComputeNodeInstance(instanceDefinition);
- assertThat(actual.getInstanceDefinition(), is(instanceDefinition));
- }
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 722a6fcc4fe..219c353ab25 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -72,9 +72,9 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
MetaDataContexts metaDataContexts =
createMetaDataContextsBuilder(metaDataPersistService,
parameter).build(metaDataPersistService);
persistMetaData(metaDataContexts);
Properties transactionProps =
getTransactionProperties(metaDataContexts);
- persistTransactionConfiguration(parameter, metaDataPersistService,
transactionProps);
- ContextManager result = createContextManager(repository,
metaDataPersistService, parameter.getInstanceDefinition(), metaDataContexts,
transactionProps, parameter.getModeConfig());
- registerOnline(metaDataPersistService,
parameter.getInstanceDefinition(), result, registryCenter);
+ persistTransactionConfiguration(metaDataPersistService,
transactionProps);
+ ContextManager result = createContextManager(repository,
registryCenter, parameter.getInstanceDefinition(), metaDataContexts,
transactionProps, parameter.getModeConfig());
+ registerOnline(metaDataPersistService, parameter, result,
registryCenter);
return result;
}
@@ -83,7 +83,6 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
if (!parameter.isEmpty()) {
metaDataPersistService.persistConfigurations(parameter.getDatabaseConfigs(),
parameter.getGlobalRuleConfigs(), parameter.getProps(), isOverwrite);
}
-
metaDataPersistService.persistInstanceLabels(parameter.getInstanceDefinition().getInstanceId().getId(),
parameter.getLabels(), isOverwrite);
}
private MetaDataContextsBuilder createMetaDataContextsBuilder(final
MetaDataPersistService metaDataPersistService, final
ContextManagerBuilderParameter parameter) {
@@ -118,14 +117,10 @@ public final class ClusterContextManagerBuilder
implements ContextManagerBuilder
return result;
}
- private void persistTransactionConfiguration(final
ContextManagerBuilderParameter parameter, final MetaDataPersistService
metaDataPersistService, final Properties transactionProps) {
+ private void persistTransactionConfiguration(final MetaDataPersistService
metaDataPersistService, final Properties transactionProps) {
if (!transactionProps.isEmpty()) {
metaDataPersistService.persistTransactionRule(transactionProps,
true);
}
- String instanceId =
parameter.getInstanceDefinition().getInstanceId().getId();
- if
(!metaDataPersistService.getComputeNodePersistService().loadXaRecoveryId(instanceId).isPresent())
{
-
metaDataPersistService.getComputeNodePersistService().persistInstanceXaRecoveryId(instanceId,
instanceId);
- }
}
private Map<String, DatabaseConfiguration> getDatabaseConfigMap(final
Collection<String> databaseNames, final MetaDataPersistService
metaDataPersistService,
@@ -147,13 +142,12 @@ public final class ClusterContextManagerBuilder
implements ContextManagerBuilder
.forEach((schemaName, tables) ->
metaDataContexts.getPersistService().ifPresent(optional ->
optional.getSchemaMetaDataService().persistMetaData(databaseName, schemaName,
tables))));
}
- private ContextManager createContextManager(final ClusterPersistRepository
repository, final MetaDataPersistService metaDataPersistService,
+ private ContextManager createContextManager(final ClusterPersistRepository
repository, final RegistryCenter registryCenter,
final InstanceDefinition
instanceDefinition, final MetaDataContexts metaDataContexts,
final Properties
transactionProps, final ModeConfiguration modeConfig) {
- ComputeNodeInstance computeNodeInstance =
metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(instanceDefinition);
- ClusterWorkerIdGenerator clusterWorkerIdGenerator = new
ClusterWorkerIdGenerator(repository, metaDataPersistService,
instanceDefinition);
+ ClusterWorkerIdGenerator clusterWorkerIdGenerator = new
ClusterWorkerIdGenerator(repository, registryCenter, instanceDefinition);
DistributeLockContext distributeLockContext = new
DistributeLockContext(repository);
- InstanceContext instanceContext = new
InstanceContext(computeNodeInstance, clusterWorkerIdGenerator, modeConfig,
distributeLockContext);
+ InstanceContext instanceContext = new InstanceContext(new
ComputeNodeInstance(instanceDefinition), clusterWorkerIdGenerator, modeConfig,
distributeLockContext);
repository.watchSessionConnection(instanceContext);
generateTransactionConfigurationFile(instanceContext,
metaDataContexts, transactionProps);
TransactionContexts transactionContexts = new
TransactionContextsBuilder(
@@ -170,11 +164,14 @@ public final class ClusterContextManagerBuilder
implements ContextManagerBuilder
}
}
- private void registerOnline(final MetaDataPersistService
metaDataPersistService, final InstanceDefinition instanceDefinition, final
ContextManager contextManager,
+ private void registerOnline(final MetaDataPersistService
metaDataPersistService, final ContextManagerBuilderParameter parameter, final
ContextManager contextManager,
final RegistryCenter registryCenter) {
+ String instanceId =
contextManager.getInstanceContext().getInstance().getCurrentInstanceId();
+
contextManager.getInstanceContext().getInstance().setXaRecoveryId(instanceId);
+
contextManager.getInstanceContext().getInstance().setLabels(parameter.getLabels());
+
contextManager.getInstanceContext().getComputeNodeInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
new ClusterContextManagerCoordinator(metaDataPersistService,
contextManager, registryCenter);
-
contextManager.getInstanceContext().getComputeNodeInstances().addAll(metaDataPersistService.getComputeNodePersistService().loadAllComputeNodeInstances());
- registryCenter.onlineInstance(instanceDefinition);
+
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
}
@Override
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 36b29700913..3ea25752e9b 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -238,6 +238,7 @@ public final class ClusterContextManagerCoordinator {
*/
@Subscribe
public synchronized void renew(final LabelsEvent event) {
+ // TODO labels may be empty
contextManager.getInstanceContext().updateLabel(event.getInstanceId(),
event.getLabels());
}
@@ -260,7 +261,7 @@ public final class ClusterContextManagerCoordinator {
*/
@Subscribe
public synchronized void renew(final InstanceOnlineEvent event) {
- ComputeNodeInstance instance =
metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceDefinition());
+ ComputeNodeInstance instance = new
ComputeNodeInstance(event.getInstanceDefinition());
contextManager.getInstanceContext().addComputeNodeInstance(instance);
}
@@ -271,7 +272,7 @@ public final class ClusterContextManagerCoordinator {
*/
@Subscribe
public synchronized void renew(final InstanceOfflineEvent event) {
-
contextManager.getInstanceContext().deleteComputeNodeInstance(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(event.getInstanceDefinition()));
+ contextManager.getInstanceContext().deleteComputeNodeInstance(new
ComputeNodeInstance(event.getInstanceDefinition()));
}
/**
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 8e927bcd367..edde2f881b8 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator;
import lombok.Getter;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.LockRegistryService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.MutexLockRegistryService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
@@ -70,10 +70,12 @@ public final class RegistryCenter {
/**
* Online instance.
*
- * @param instanceDefinition instance definition
+ * @param computeNodeInstance compute node instance
*/
- public void onlineInstance(final InstanceDefinition instanceDefinition) {
- computeNodeStatusService.registerOnline(instanceDefinition);
+ public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
+
computeNodeStatusService.registerOnline(computeNodeInstance.getInstanceDefinition());
+
computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(),
computeNodeInstance.getLabels());
+
computeNodeStatusService.persistInstanceXaRecoveryId(computeNodeInstance.getCurrentInstanceId(),
computeNodeInstance.getXaRecoveryId());
listenerFactory.watchListeners();
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 5715e3cef56..36178bff7e9 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -17,15 +17,27 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service;
+import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.definition.InstanceType;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
/**
* Compute node status service.
*/
@RequiredArgsConstructor
+@Slf4j
public final class ComputeNodeStatusService {
private final ClusterPersistRepository repository;
@@ -38,4 +50,137 @@ public final class ComputeNodeStatusService {
public void registerOnline(final InstanceDefinition instanceDefinition) {
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(instanceDefinition.getInstanceId().getId(),
instanceDefinition.getInstanceType()), "");
}
+
+ /**
+ * Persist instance labels.
+ *
+ * @param instanceId instance id
+ * @param labels collection of label
+ */
+ public void persistInstanceLabels(final String instanceId, final
Collection<String> labels) {
+ if (null != labels && !labels.isEmpty()) {
+
repository.persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId),
YamlEngine.marshal(labels));
+ }
+ }
+
+ /**
+ * Delete instance labels.
+ *
+ * @param instanceId instance id
+ */
+ public void deleteInstanceLabels(final String instanceId) {
+ if (isExisted(instanceId)) {
+
repository.delete(ComputeNode.getInstanceLabelsNodePath(instanceId));
+ }
+ }
+
+ private boolean isExisted(final String instanceId) {
+ return
!Strings.isNullOrEmpty(repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId)));
+ }
+
+ /**
+ * Persist instance worker id.
+ *
+ * @param instanceId instance id
+ * @param workerId worker id
+ */
+ public void persistInstanceWorkerId(final String instanceId, final Long
workerId) {
+
repository.persistEphemeral(ComputeNode.getInstanceWorkerIdNodePath(instanceId),
String.valueOf(workerId));
+ }
+
+ /**
+ * Persist instance xa recovery id.
+ *
+ * @param instanceId instance id
+ * @param xaRecoveryId xa recovery id
+ */
+ public void persistInstanceXaRecoveryId(final String instanceId, final
String xaRecoveryId) {
+ loadXaRecoveryId(instanceId).ifPresent(each ->
repository.delete(ComputeNode.getInstanceXaRecoveryIdNodePath(each,
instanceId)));
+
repository.persistEphemeral(ComputeNode.getInstanceXaRecoveryIdNodePath(xaRecoveryId,
instanceId), "");
+ }
+
+ /**
+ * Load instance labels.
+ *
+ * @param instanceId instance id
+ * @return labels
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<String> loadInstanceLabels(final String instanceId) {
+ String yamlContent =
repository.get(ComputeNode.getInstanceLabelsNodePath(instanceId));
+ return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() :
YamlEngine.unmarshal(yamlContent, Collection.class);
+ }
+
+ /**
+ * Load instance status.
+ *
+ * @param instanceId instance id
+ * @return status
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<String> loadInstanceStatus(final String instanceId) {
+ String yamlContent =
repository.get(ComputeNode.getInstanceStatusNodePath(instanceId));
+ return Strings.isNullOrEmpty(yamlContent) ? new ArrayList<>() :
YamlEngine.unmarshal(yamlContent, Collection.class);
+ }
+
+ /**
+ * Load instance worker id.
+ *
+ * @param instanceId instance id
+ * @return worker id
+ */
+ public Optional<Long> loadInstanceWorkerId(final String instanceId) {
+ try {
+ String workerId =
repository.get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
+ return Strings.isNullOrEmpty(workerId) ? Optional.empty() :
Optional.of(Long.valueOf(workerId));
+ } catch (final NumberFormatException ex) {
+ log.error("Invalid worker id for instance: {}", instanceId);
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Load instance xa recovery id.
+ *
+ * @param instanceId instance id
+ * @return xa recovery id
+ */
+ public Optional<String> loadXaRecoveryId(final String instanceId) {
+ List<String> xaRecoveryIds =
repository.getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+ for (String xaRecoveryId : xaRecoveryIds) {
+ if (repository.getChildrenKeys(String.join("/",
ComputeNode.getXaRecoveryIdNodePath(), xaRecoveryId)).contains(instanceId)) {
+ return Optional.of(xaRecoveryId);
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Load all compute node instances.
+ *
+ * @return compute node instances
+ */
+ public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
+ Collection<ComputeNodeInstance> result = new ArrayList<>();
+ Arrays.stream(InstanceType.values()).forEach(instanceType -> {
+ Collection<String> onlineComputeNodes =
repository.getChildrenKeys(ComputeNode.getOnlineNodePath(instanceType));
+ onlineComputeNodes.forEach(each ->
result.add(loadComputeNodeInstance(new InstanceDefinition(instanceType,
each))));
+ });
+ return result;
+ }
+
+ /**
+ * Load compute node instance by instance definition.
+ *
+ * @param instanceDefinition instance definition
+ * @return compute node instance
+ */
+ public ComputeNodeInstance loadComputeNodeInstance(final
InstanceDefinition instanceDefinition) {
+ ComputeNodeInstance result = new
ComputeNodeInstance(instanceDefinition);
+
result.setLabels(loadInstanceLabels(instanceDefinition.getInstanceId().getId()));
+
result.switchState(loadInstanceStatus(instanceDefinition.getInstanceId().getId()));
+
loadInstanceWorkerId(instanceDefinition.getInstanceId().getId()).ifPresent(result::setWorkerId);
+
loadXaRecoveryId(instanceDefinition.getInstanceId().getId()).ifPresent(result::setXaRecoveryId);
+ return result;
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index 0337c42be2d..944fb41cba3 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -20,8 +20,8 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.work
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdNode;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Optional;
@@ -34,18 +34,18 @@ public final class ClusterWorkerIdGenerator implements
WorkerIdGenerator {
private final ClusterPersistRepository repository;
- private final MetaDataPersistService metaDataPersistService;
+ private final RegistryCenter registryCenter;
private final InstanceDefinition instanceDefinition;
@Override
public long generate() {
- return
metaDataPersistService.getComputeNodePersistService().loadInstanceWorkerId(instanceDefinition.getInstanceId().getId()).orElseGet(this::reGenerate);
+ return
registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceDefinition.getInstanceId().getId()).orElseGet(this::reGenerate);
}
private Long reGenerate() {
Long result =
Long.valueOf(Optional.ofNullable(repository.getSequentialId(WorkerIdNode.getWorkerIdGeneratorPath(instanceDefinition.getInstanceId().getId()),
"")).orElse("0"));
-
metaDataPersistService.getComputeNodePersistService().persistInstanceWorkerId(instanceDefinition.getInstanceId().getId(),
result);
+
registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceDefinition.getInstanceId().getId(),
result);
return result;
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 443b675ec23..da966e03c1a 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -335,13 +335,11 @@ public final class ClusterContextManagerCoordinatorTest {
@Test
public void assertRenewInstanceOnlineEvent() {
InstanceDefinition instanceDefinition1 = new
InstanceDefinition(InstanceType.PROXY, "online_instance_id@1");
- InstanceDefinition instanceDefinition2 = new
InstanceDefinition(InstanceType.PROXY, "online_instance_id@2");
-
when(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(instanceDefinition1)).thenReturn(new
ComputeNodeInstance(instanceDefinition1));
-
when(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(instanceDefinition2)).thenReturn(new
ComputeNodeInstance(instanceDefinition2));
InstanceOnlineEvent instanceOnlineEvent1 = new
InstanceOnlineEvent(instanceDefinition1);
coordinator.renew(instanceOnlineEvent1);
assertThat(contextManager.getInstanceContext().getComputeNodeInstances().size(),
is(1));
assertThat(((LinkedList<ComputeNodeInstance>)
contextManager.getInstanceContext().getComputeNodeInstances()).get(0).getInstanceDefinition(),
is(instanceDefinition1));
+ InstanceDefinition instanceDefinition2 = new
InstanceDefinition(InstanceType.PROXY, "online_instance_id@2");
InstanceOnlineEvent instanceOnlineEvent2 = new
InstanceOnlineEvent(instanceDefinition2);
coordinator.renew(instanceOnlineEvent2);
assertThat(contextManager.getInstanceContext().getComputeNodeInstances().size(),
is(2));
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServi
[...]
index 2bb5688761e..0aac987ee72 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -17,15 +17,25 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceType;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class ComputeNodeStatusServiceTest {
@@ -39,4 +49,80 @@ public final class ComputeNodeStatusServiceTest {
new
ComputeNodeStatusService(repository).registerOnline(instanceDefinition);
verify(repository).persistEphemeral("/nodes/compute_nodes/online/proxy/" +
instanceDefinition.getInstanceId().getId(), "");
}
+
+ @Test
+ public void assertPersistInstanceLabels() {
+ ComputeNodeStatusService computeNodeStatusService = new
ComputeNodeStatusService(repository);
+ InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ computeNodeStatusService.persistInstanceLabels(instanceId,
Collections.singletonList("test"));
+ verify(repository,
times(1)).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId),
YamlEngine.marshal(Collections.singletonList("test")));
+ computeNodeStatusService.persistInstanceLabels(instanceId,
Collections.emptyList());
+ verify(repository,
times(0)).persistEphemeral(ComputeNode.getInstanceLabelsNodePath(instanceId),
YamlEngine.marshal(Collections.emptyList()));
+ }
+
+ @Test
+ public void assertPersistInstanceWorkerId() {
+ InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new
ComputeNodeStatusService(repository).persistInstanceWorkerId(instanceId, 100L);
+
verify(repository).persistEphemeral(ComputeNode.getInstanceWorkerIdNodePath(instanceId),
String.valueOf(100L));
+ }
+
+ @Test
+ public void assertPersistInstanceXaRecoveryId() {
+ InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new
ComputeNodeStatusService(repository).persistInstanceXaRecoveryId(instanceId,
instanceId);
+
verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+
verify(repository).persistEphemeral(ComputeNode.getInstanceXaRecoveryIdNodePath(instanceId,
instanceId), "");
+ }
+
+ @Test
+ public void assertLoadInstanceLabels() {
+ InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new
ComputeNodeStatusService(repository).loadInstanceLabels(instanceId);
+
verify(repository).get(ComputeNode.getInstanceLabelsNodePath(instanceId));
+ }
+
+ @Test
+ public void assertLoadInstanceStatus() {
+ InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new
ComputeNodeStatusService(repository).loadInstanceStatus(instanceId);
+
verify(repository).get(ComputeNode.getInstanceStatusNodePath(instanceId));
+ }
+
+ @Test
+ public void assertLoadInstanceWorkerId() {
+ InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new
ComputeNodeStatusService(repository).loadInstanceWorkerId(instanceId);
+
verify(repository).get(ComputeNode.getInstanceWorkerIdNodePath(instanceId));
+ }
+
+ @Test
+ public void assertLoadInstanceXaRecoveryId() {
+ InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
+ final String instanceId = instanceDefinition.getInstanceId().getId();
+ new ComputeNodeStatusService(repository).loadXaRecoveryId(instanceId);
+
verify(repository).getChildrenKeys(ComputeNode.getXaRecoveryIdNodePath());
+
+ }
+
+ @Test
+ public void assertLoadAllComputeNodeInstances() {
+
when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("127.0.0.1@3307"));
+
when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("127.0.0.1@3308"));
+ Collection<ComputeNodeInstance> actual = new
ComputeNodeStatusService(repository).loadAllComputeNodeInstances();
+ assertThat(actual.size(), is(2));
+ }
+
+ @Test
+ public void assertLoadComputeNodeInstance() {
+ InstanceDefinition instanceDefinition = new
InstanceDefinition(InstanceType.PROXY, 3307);
+ ComputeNodeInstance actual = new
ComputeNodeStatusService(repository).loadComputeNodeInstance(instanceDefinition);
+ assertThat(actual.getInstanceDefinition(), is(instanceDefinition));
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/resources/logback-test.xml
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/resources/logback-test.xml
index a7207c70eba..064918a0829 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/resources/logback-test.xml
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/resources/logback-test.xml
@@ -26,7 +26,7 @@
<appender-ref ref="console" />
</logger>
<logger
name="org.apache.shardingsphere.schedule.core.api.ModeScheduleContext"
level="error" />
- <logger
name="org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService"
level="off" />
+ <logger
name="org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService"
level="off" />
<root>
<level value="error" />
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
index b88302d74b9..7ce473af11d 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
import
org.apache.shardingsphere.infra.config.database.impl.DataSourceProvidedDatabaseConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.infra.rule.identifier.type.InstanceAwareRule;
@@ -56,7 +57,7 @@ public final class StandaloneContextManagerBuilder implements
ContextManagerBuil
MetaDataPersistService metaDataPersistService = new
MetaDataPersistService(StandalonePersistRepositoryFactory.getInstance(parameter.getModeConfig().getRepository()));
persistConfigurations(metaDataPersistService, parameter);
MetaDataContexts metaDataContexts =
createMetaDataContextsBuilder(metaDataPersistService,
parameter).build(metaDataPersistService);
- return createContextManager(metaDataPersistService, parameter,
metaDataContexts);
+ return createContextManager(parameter, metaDataContexts);
}
private void persistConfigurations(final MetaDataPersistService
metaDataPersistService, final ContextManagerBuilderParameter parameter) {
@@ -89,9 +90,9 @@ public final class StandaloneContextManagerBuilder implements
ContextManagerBuil
return new DataSourceProvidedDatabaseConfiguration(dataSources,
databaseRuleConfigs);
}
- private ContextManager createContextManager(final MetaDataPersistService
metaDataPersistService, final ContextManagerBuilderParameter parameter, final
MetaDataContexts metaDataContexts) {
- InstanceContext instanceContext = new
InstanceContext(metaDataPersistService.getComputeNodePersistService().loadComputeNodeInstance(parameter.getInstanceDefinition()),
- new StandaloneWorkerIdGenerator(), parameter.getModeConfig(),
new StandaloneLockContext());
+ private ContextManager createContextManager(final
ContextManagerBuilderParameter parameter, final MetaDataContexts
metaDataContexts) {
+ InstanceContext instanceContext = new InstanceContext(new
ComputeNodeInstance(parameter.getInstanceDefinition()), new
StandaloneWorkerIdGenerator(), parameter.getModeConfig(),
+ new StandaloneLockContext());
generateTransactionConfigurationFile(instanceContext,
metaDataContexts);
TransactionContexts transactionContexts = new
TransactionContextsBuilder(
metaDataContexts.getMetaData().getDatabases(),
metaDataContexts.getMetaData().getGlobalRuleMetaData().getRules()).build();
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/resources/logback-test.xml
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/resources/logback-test.xml
index 00559e3410c..5196f987673 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/resources/logback-test.xml
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/test/resources/logback-test.xml
@@ -25,7 +25,7 @@
<logger name="org.apache.shardingsphere" level="warn" additivity="false">
<appender-ref ref="console" />
</logger>
- <logger
name="org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService"
level="off" />
+ <logger
name="org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService"
level="off" />
<root>
<level value="error" />
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/AlterInstanceHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/AlterInstanceHandler.java
index 0bd8e33016e..0a5c7e73151 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/AlterInstanceHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/AlterInstanceHandler.java
@@ -19,14 +19,13 @@ package
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.updatabl
import
org.apache.shardingsphere.distsql.parser.statement.ral.common.updatable.AlterInstanceStatement;
import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.UpdatableRALBackendHandler;
-import java.util.Collection;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Alter instance handler.
@@ -49,15 +48,10 @@ public final class AlterInstanceHandler extends
UpdatableRALBackendHandler<Alter
if (!persistService.isPresent()) {
throw new UnsupportedOperationException(String.format("No
persistence configuration found, unable to set '%s'", sqlStatement.getKey()));
}
- Collection<ComputeNodeInstance> instances =
persistService.get().getComputeNodePersistService().loadAllComputeNodeInstances();
- checkExisted(instances, sqlStatement);
-
persistService.get().getComputeNodePersistService().persistInstanceXaRecoveryId(sqlStatement.getInstanceId(),
sqlStatement.getValue());
- }
-
- private void checkExisted(final Collection<ComputeNodeInstance> instances,
final AlterInstanceStatement sqlStatement) {
- Collection<String> instanceIds = instances.stream().map(each ->
each.getInstanceDefinition().getInstanceId().getId()).collect(Collectors.toSet());
- if (!instanceIds.contains(sqlStatement.getInstanceId())) {
+ if
(!contextManager.getInstanceContext().getComputeNodeInstanceById(sqlStatement.getInstanceId()).isPresent())
{
throw new UnsupportedOperationException(String.format("'%s' does
not exist", sqlStatement.getInstanceId()));
}
+ // TODO need support standalone mode
+ ShardingSphereEventBus.getInstance().post(new
XaRecoveryIdEvent(sqlStatement.getInstanceId(), sqlStatement.getValue()));
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/LabelInstanceHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/LabelInstanceHandler.java
index d18d324f18a..563ea75a4c6 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/LabelInstanceHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/LabelInstanceHandler.java
@@ -18,11 +18,11 @@
package
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.updatable;
import
org.apache.shardingsphere.distsql.parser.statement.ral.common.updatable.LabelInstanceStatement;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceId;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import
org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.UpdatableRALBack
import java.util.Collection;
import java.util.LinkedHashSet;
+import java.util.Optional;
/**
* Label instance handler.
@@ -43,11 +44,13 @@ public final class LabelInstanceHandler extends
UpdatableRALBackendHandler<Label
throw new UnsupportedOperationException("Labels can only be added
in cluster mode");
}
String instanceId = new InstanceId(sqlStatement.getIp(),
String.valueOf(sqlStatement.getPort())).getId();
- ComputeNodeInstance instances =
persistService.getComputeNodePersistService().loadComputeNodeInstance(new
InstanceDefinition(InstanceType.PROXY, instanceId));
- Collection<String> labels = new
LinkedHashSet<>(sqlStatement.getLabels());
- if (!sqlStatement.isOverwrite()) {
- labels.addAll(instances.getLabels());
+ Optional<ComputeNodeInstance> computeNodeInstance =
contextManager.getInstanceContext().getComputeNodeInstanceById(instanceId);
+ if (computeNodeInstance.isPresent()) {
+ Collection<String> labels = new
LinkedHashSet<>(sqlStatement.getLabels());
+ if (!sqlStatement.isOverwrite() && null !=
computeNodeInstance.get().getLabels()) {
+ labels.addAll(computeNodeInstance.get().getLabels());
+ }
+ ShardingSphereEventBus.getInstance().post(new
LabelsEvent(instanceId, labels));
}
-
persistService.getComputeNodePersistService().persistInstanceLabels(instanceId,
labels, true);
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/SetInstanceStatusHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/SetInstanceStatusHandler.java
index 2162c85f747..4761e85ce1e 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/SetInstanceStatusHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/SetInstanceStatusHandler.java
@@ -19,21 +19,13 @@ package
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.updatabl
import
org.apache.shardingsphere.distsql.parser.statement.ral.common.updatable.SetInstanceStatusStatement;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.InstanceContext;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceId;
import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.UpdatableRALBackendHandler;
-import java.util.Collection;
-import java.util.Optional;
-
/**
* Set instance status handler.
*/
@@ -47,49 +39,29 @@ public final class SetInstanceStatusHandler extends
UpdatableRALBackendHandler<S
InstanceId operationInstanceId = new InstanceId(sqlStatement.getIp(),
String.valueOf(sqlStatement.getPort()));
boolean isDisable = "DISABLE".equals(sqlStatement.getStatus());
if (isDisable) {
- checkDisablingIsValid(operationInstanceId);
+ checkDisablingIsValid(contextManager, operationInstanceId);
} else {
- checkEnablingIsValid(operationInstanceId);
+ checkEnablingIsValid(contextManager, operationInstanceId);
}
ShardingSphereEventBus.getInstance().post(new
ComputeNodeStatusChangedEvent(isDisable ? ComputeNodeStatus.CIRCUIT_BREAK :
ComputeNodeStatus.ONLINE,
sqlStatement.getIp(), sqlStatement.getPort()));
}
- private void checkEnablingIsValid(final InstanceId operationInstanceId) {
- checkExist(operationInstanceId);
+ private void checkEnablingIsValid(final ContextManager contextManager,
final InstanceId operationInstanceId) {
+ if
(!contextManager.getInstanceContext().getComputeNodeInstanceById(operationInstanceId.getId()).isPresent())
{
+ throw new UnsupportedOperationException(String.format("`%s` does
not exist", operationInstanceId.getId()));
+ }
}
- private void checkDisablingIsValid(final InstanceId operationInstanceId) {
- InstanceContext instanceContext =
ProxyContext.getInstance().getContextManager().getInstanceContext();
- if
(isIdenticalInstance(instanceContext.getInstance().getInstanceDefinition(),
operationInstanceId)) {
+ private void checkDisablingIsValid(final ContextManager contextManager,
final InstanceId operationInstanceId) {
+ if
(contextManager.getInstanceContext().getInstance().getCurrentInstanceId().equals(operationInstanceId.getId()))
{
throw new UnsupportedOperationException(String.format("`%s` is the
currently in use instance and cannot be disabled",
operationInstanceId.getId()));
}
- checkExist(operationInstanceId);
- checkExistDisabled(operationInstanceId);
- }
-
- private void checkExistDisabled(final InstanceId operationInstanceId) {
- Optional<MetaDataPersistService> metaDataPersistService =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getPersistService();
- if (metaDataPersistService.isPresent()) {
-
metaDataPersistService.get().getComputeNodePersistService().loadAllComputeNodeInstances().forEach(each
-> {
- if (StateType.CIRCUIT_BREAK ==
each.getState().getCurrentState() &&
isIdenticalInstance(each.getInstanceDefinition(), operationInstanceId)) {
- throw new
UnsupportedOperationException(String.format("`%s` compute node has been
disabled", operationInstanceId.getId()));
- }
- });
+ if
(!contextManager.getInstanceContext().getComputeNodeInstanceById(operationInstanceId.getId()).isPresent())
{
+ throw new UnsupportedOperationException(String.format("`%s` does
not exist", operationInstanceId.getId()));
+ }
+ if
(contextManager.getInstanceContext().getComputeNodeInstanceById(operationInstanceId.getId()).get().getState().getCurrentState()
== StateType.CIRCUIT_BREAK) {
+ throw new UnsupportedOperationException(String.format("`%s`
compute node has been disabled", operationInstanceId.getId()));
}
- }
-
- private void checkExist(final InstanceId operationInstanceId) {
- Optional<MetaDataPersistService> metaDataPersistService =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getPersistService();
- metaDataPersistService.ifPresent(optional -> {
- Collection<ComputeNodeInstance> computeNodeInstances =
optional.getComputeNodePersistService().loadAllComputeNodeInstances();
- if (computeNodeInstances.stream().noneMatch(each ->
isIdenticalInstance(each.getInstanceDefinition(), operationInstanceId))) {
- throw new UnsupportedOperationException(String.format("`%s`
does not exist", operationInstanceId.getId()));
- }
- });
- }
-
- private boolean isIdenticalInstance(final InstanceDefinition definition,
final InstanceId instanceId) {
- return definition.getInstanceId().getId().equals(instanceId.getId());
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/UnlabelInstanceHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/UnlabelInstanceHandler.java
index b584e9249d5..78294bbbbe7 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/UnlabelInstanceHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/updatable/UnlabelInstanceHandler.java
@@ -19,18 +19,20 @@ package
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.updatabl
import
org.apache.shardingsphere.distsql.parser.statement.ral.common.updatable.UnlabelInstanceStatement;
import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
import org.apache.shardingsphere.infra.instance.definition.InstanceId;
-import org.apache.shardingsphere.infra.instance.definition.InstanceType;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import
org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.text.distsql.ral.UpdatableRALBackendHandler;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashSet;
+import java.util.Optional;
/**
* Unlabel instance handler.
@@ -44,13 +46,15 @@ public final class UnlabelInstanceHandler extends
UpdatableRALBackendHandler<Unl
throw new UnsupportedOperationException("Labels can only be
removed in cluster mode");
}
String instanceId = new InstanceId(sqlStatement.getIp(),
String.valueOf(sqlStatement.getPort())).getId();
- ComputeNodeInstance instances =
persistService.getComputeNodePersistService().loadComputeNodeInstance(new
InstanceDefinition(InstanceType.PROXY, instanceId));
- Collection<String> labels = new LinkedHashSet<>(instances.getLabels());
- if (sqlStatement.getLabels().isEmpty()) {
-
persistService.getComputeNodePersistService().deleteInstanceLabels(instanceId);
- } else {
- labels.removeAll(sqlStatement.getLabels());
-
persistService.getComputeNodePersistService().persistInstanceLabels(instanceId,
labels, true);
+ Optional<ComputeNodeInstance> computeNodeInstance =
contextManager.getInstanceContext().getComputeNodeInstanceById(instanceId);
+ if (computeNodeInstance.isPresent()) {
+ Collection<String> labels = new
LinkedHashSet<>(computeNodeInstance.get().getLabels());
+ if (sqlStatement.getLabels().isEmpty()) {
+ ShardingSphereEventBus.getInstance().post(new
LabelsEvent(instanceId, Collections.EMPTY_LIST));
+ } else {
+ labels.removeAll(sqlStatement.getLabels());
+ ShardingSphereEventBus.getInstance().post(new
LabelsEvent(instanceId, labels));
+ }
}
}
}
diff --git
a/shardingsphere-test/shardingsphere-integration-driver-test/src/test/resources/logback-test.xml
b/shardingsphere-test/shardingsphere-integration-driver-test/src/test/resources/logback-test.xml
index a7207c70eba..064918a0829 100644
---
a/shardingsphere-test/shardingsphere-integration-driver-test/src/test/resources/logback-test.xml
+++
b/shardingsphere-test/shardingsphere-integration-driver-test/src/test/resources/logback-test.xml
@@ -26,7 +26,7 @@
<appender-ref ref="console" />
</logger>
<logger
name="org.apache.shardingsphere.schedule.core.api.ModeScheduleContext"
level="error" />
- <logger
name="org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService"
level="off" />
+ <logger
name="org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService"
level="off" />
<root>
<level value="error" />