This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2150e7a refactor compute node of registry center (#14516)
2150e7a is described below
commit 2150e7a9e2e40ab68d0004ed7b2125e5eb21c444
Author: Haoran Meng <[email protected]>
AuthorDate: Wed Jan 5 08:57:12 2022 +0800
refactor compute node of registry center (#14516)
Co-authored-by: menghaoran <[email protected]>
---
.../infra/instance/ComputeNodeInstance.java | 40 ++++++++++++
.../metadata/persist/MetaDataPersistService.java | 39 +++++++++++
.../mode/metadata/persist/node/ComputeNode.java | 65 ++++++++++++++++++
.../persist/service/ComputeNodePersistService.java | 76 ++++++++++++++++++++++
.../cluster/ClusterContextManagerBuilder.java | 8 +++
.../compute/service/ComputeNodeStatusService.java | 5 +-
.../service/ComputeNodeStatusServiceTest.java | 2 +-
7 files changed, 231 insertions(+), 4 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
new file mode 100644
index 0000000..036b3c6
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.instance;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+
+import java.util.Collection;
+
+/**
+ * Instance of compute node.
+ */
+@Getter
+@Setter
+public final class ComputeNodeInstance {
+
+ private String ip;
+
+ private String port;
+
+ private Collection<ShardingSphereUser> users;
+
+ private Collection<String> labels;
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
index a651995..e6a6835 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
@@ -18,8 +18,12 @@
package org.apache.shardingsphere.mode.metadata.persist;
import lombok.Getter;
+import org.apache.shardingsphere.authority.config.AuthorityRuleConfiguration;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import
org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.SchemaMetaDataPersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.impl.DataSourcePersistService;
import
org.apache.shardingsphere.mode.metadata.persist.service.impl.GlobalRulePersistService;
@@ -27,9 +31,11 @@ import
org.apache.shardingsphere.mode.metadata.persist.service.impl.PropertiesPe
import
org.apache.shardingsphere.mode.metadata.persist.service.impl.SchemaRulePersistService;
import org.apache.shardingsphere.mode.persist.PersistRepository;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Properties;
/**
@@ -50,6 +56,8 @@ public final class MetaDataPersistService {
private final PropertiesPersistService propsService;
+ private final ComputeNodePersistService computeNodePersistService;
+
public MetaDataPersistService(final PersistRepository repository) {
this.repository = repository;
dataSourceService = new DataSourcePersistService(repository);
@@ -57,6 +65,7 @@ public final class MetaDataPersistService {
schemaRuleService = new SchemaRulePersistService(repository);
globalRuleService = new GlobalRulePersistService(repository);
propsService = new PropertiesPersistService(repository);
+ computeNodePersistService = new ComputeNodePersistService(repository);
}
/**
@@ -78,4 +87,34 @@ public final class MetaDataPersistService {
schemaRuleService.persist(schemaName,
schemaRuleConfigs.get(schemaName), isOverwrite);
}
}
+
+ /**
+ * Persist instance configurations.
+ *
+ * @param instanceId instance id
+ * @param labels collection of label
+ */
+ public void persistInstanceConfigurations(final String instanceId, final
Collection<String> labels) {
+ computeNodePersistService.persistInstanceLabels(instanceId, labels);
+ }
+
+ /**
+ * load compute node instances by labels.
+ *
+ * @param labels collection of label
+ * @return collection of compute node instance
+ */
+ public Collection<ComputeNodeInstance> loadComputeNodeInstances(final
Collection<String> labels) {
+ Collection<ComputeNodeInstance> result =
computeNodePersistService.loadAllComputeNodeInstances();
+ if (!result.isEmpty()) {
+ final Collection<ShardingSphereUser> users = new ArrayList<>();
+ Optional<AuthorityRuleConfiguration> optional =
globalRuleService.load().stream().filter(each -> each instanceof
AuthorityRuleConfiguration)
+ .map(each -> (AuthorityRuleConfiguration)
each).findFirst();
+ if (optional.isPresent()) {
+ users.addAll(optional.get().getUsers());
+ }
+ result.forEach(each -> each.setUsers(users));
+ }
+ return result;
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
new file mode 100644
index 0000000..a2a197b
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.node;
+
+/**
+ * Compute node.
+ */
+public final class ComputeNode {
+
+ private static final String ROOT_NODE = "nodes";
+
+ private static final String COMPUTE_NODE = "compute_nodes";
+
+ private static final String ONLINE_NODE = "online";
+
+ private static final String ATTRIBUTES_NODE = "attributes";
+
+ private static final String LABEL_NODE = "label";
+
+ private static final String STATUS_NODE = "status";
+
+ /**
+ * Get online compute node path.
+ *
+ * @return path of online compute node
+ */
+ public static String getOnlineNodePath() {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ONLINE_NODE);
+ }
+
+ /**
+ * Get online compute node instance path.
+ *
+ * @param instanceId instance id
+ * @return path of online compute node instance
+ */
+ public static String getOnlineInstanceNodePath(final String instanceId) {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ONLINE_NODE,
instanceId);
+ }
+
+ /**
+ * Get online compute node instance label path.
+ *
+ * @param instanceId instance id
+ * @return path of compute node instance label
+ */
+ public static String getInstanceLabelNodePath(final String instanceId) {
+ return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ATTRIBUTES_NODE,
instanceId, LABEL_NODE);
+ }
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
new file mode 100644
index 0000000..09e2568
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.service;
+
+import com.google.common.base.Splitter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Compute node persist service.
+ */
+@RequiredArgsConstructor
+public final class ComputeNodePersistService {
+
+ private final PersistRepository repository;
+
+ /**
+ * Persist instance labels.
+ *
+ * @param instanceId instance id
+ * @param labels collection of label
+ */
+ public void persistInstanceLabels(final String instanceId, final
Collection<String> labels) {
+ repository.persist(ComputeNode.getInstanceLabelNodePath(instanceId),
YamlEngine.marshal(labels));
+ }
+
+ /**
+ * Load instance labels.
+ *
+ * @param instanceId instance id
+ * @return collection of label
+ */
+ public Collection<String> loadInstanceLabels(final String instanceId) {
+ return
YamlEngine.unmarshal(repository.get(ComputeNode.getInstanceLabelNodePath(instanceId)),
Collection.class);
+ }
+
+ /**
+ * Load all compute node instances.
+ *
+ * @return collection of compute node instance
+ */
+ public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
+ Collection<String> onlineComputeNodes =
repository.getChildrenKeys(ComputeNode.getOnlineNodePath());
+ List<ComputeNodeInstance> result = new
ArrayList<>(onlineComputeNodes.size());
+ onlineComputeNodes.forEach(each -> {
+ ComputeNodeInstance instance = new ComputeNodeInstance();
+ instance.setIp(Splitter.on("@").splitToList(each).get(0));
+ instance.setPort(Splitter.on("@").splitToList(each).get(1));
+ instance.setLabels(loadInstanceLabels(each));
+ result.add(instance);
+ });
+ return result;
+ }
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 6b3ab9b..e606307 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -22,6 +22,7 @@ import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
+import org.apache.shardingsphere.infra.instance.Instance;
import org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.loader.SchemaLoader;
@@ -91,6 +92,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
ModeScheduleContextFactory.getInstance().init(parameter.getModeConfig());
metaDataPersistService = new MetaDataPersistService(repository);
persistConfigurations(metaDataPersistService,
parameter.getDataSourcesMap(), parameter.getSchemaRuleConfigs(),
parameter.getGlobalRuleConfigs(), parameter.getProps(),
parameter.isOverwrite());
+ persistInstanceConfigurations(parameter.getLabels());
Collection<String> schemaNames =
Strings.isNullOrEmpty(parameter.getSchemaName()) ?
metaDataPersistService.getSchemaMetaDataService()
.loadAllNames() :
Collections.singletonList(parameter.getSchemaName());
Map<String, Map<String, DataSource>> clusterDataSources =
loadDataSourcesMap(metaDataPersistService, parameter.getDataSourcesMap(),
schemaNames);
@@ -125,6 +127,12 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
}
}
+ private void persistInstanceConfigurations(final Collection<String>
labels) {
+ if (null != labels && !labels.isEmpty()) {
+
metaDataPersistService.persistInstanceConfigurations(Instance.getInstance().getId(),
labels);
+ }
+ }
+
private boolean isEmptyLocalConfiguration(final Map<String, Map<String,
DataSource>> dataSourcesMap,
final Map<String,
Collection<RuleConfiguration>> schemaRuleConfigs, final
Collection<RuleConfiguration> globalRuleConfigs, final Properties props) {
return isEmptyLocalDataSourcesMap(dataSourcesMap) &&
isEmptyLocalSchemaRuleConfigurations(schemaRuleConfigs) &&
globalRuleConfigs.isEmpty() && props.isEmpty();
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 26c283c..0a0a544 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -19,8 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.Instance;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
/**
@@ -35,6 +34,6 @@ public final class ComputeNodeStatusService {
* Register online.
*/
public void registerOnline() {
-
repository.persistEphemeral(ComputeStatusNode.getStatusPath(ComputeNodeStatus.ONLINE,
Instance.getInstance().getId()), "");
+
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(Instance.getInstance().getId()),
"");
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServi
[...]
index 3736c9f..4632329 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -41,6 +41,6 @@ public final class ComputeNodeStatusServiceTest {
@Test
public void assertRegisterOnline() {
new ComputeNodeStatusService(repository).registerOnline();
- verify(repository).persistEphemeral("/status/compute_nodes/online/" +
Instance.getInstance().getId(), "");
+ verify(repository).persistEphemeral("/nodes/compute_nodes/online/" +
Instance.getInstance().getId(), "");
}
}