This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2150e7a  refactor compute node of registry center (#14516)
2150e7a is described below

commit 2150e7a9e2e40ab68d0004ed7b2125e5eb21c444
Author: Haoran Meng <[email protected]>
AuthorDate: Wed Jan 5 08:57:12 2022 +0800

    refactor compute node of registry center (#14516)
    
    Co-authored-by: menghaoran <[email protected]>
---
 .../infra/instance/ComputeNodeInstance.java        | 40 ++++++++++++
 .../metadata/persist/MetaDataPersistService.java   | 39 +++++++++++
 .../mode/metadata/persist/node/ComputeNode.java    | 65 ++++++++++++++++++
 .../persist/service/ComputeNodePersistService.java | 76 ++++++++++++++++++++++
 .../cluster/ClusterContextManagerBuilder.java      |  8 +++
 .../compute/service/ComputeNodeStatusService.java  |  5 +-
 .../service/ComputeNodeStatusServiceTest.java      |  2 +-
 7 files changed, 231 insertions(+), 4 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
new file mode 100644
index 0000000..036b3c6
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.instance;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+
+import java.util.Collection;
+
+/**
+ * Instance of compute node.
+ */
+@Getter
+@Setter
+public final class ComputeNodeInstance {
+    
+    private String ip;
+    
+    private String port;
+    
+    private Collection<ShardingSphereUser> users;
+    
+    private Collection<String> labels;
+}
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
index a651995..e6a6835 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/MetaDataPersistService.java
@@ -18,8 +18,12 @@
 package org.apache.shardingsphere.mode.metadata.persist;
 
 import lombok.Getter;
+import org.apache.shardingsphere.authority.config.AuthorityRuleConfiguration;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import 
org.apache.shardingsphere.mode.metadata.persist.service.ComputeNodePersistService;
 import 
org.apache.shardingsphere.mode.metadata.persist.service.SchemaMetaDataPersistService;
 import 
org.apache.shardingsphere.mode.metadata.persist.service.impl.DataSourcePersistService;
 import 
org.apache.shardingsphere.mode.metadata.persist.service.impl.GlobalRulePersistService;
@@ -27,9 +31,11 @@ import 
org.apache.shardingsphere.mode.metadata.persist.service.impl.PropertiesPe
 import 
org.apache.shardingsphere.mode.metadata.persist.service.impl.SchemaRulePersistService;
 import org.apache.shardingsphere.mode.persist.PersistRepository;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -50,6 +56,8 @@ public final class MetaDataPersistService {
     
     private final PropertiesPersistService propsService;
     
+    private final ComputeNodePersistService computeNodePersistService;
+    
     public MetaDataPersistService(final PersistRepository repository) {
         this.repository = repository;
         dataSourceService = new DataSourcePersistService(repository);
@@ -57,6 +65,7 @@ public final class MetaDataPersistService {
         schemaRuleService = new SchemaRulePersistService(repository);
         globalRuleService = new GlobalRulePersistService(repository);
         propsService = new PropertiesPersistService(repository);
+        computeNodePersistService = new ComputeNodePersistService(repository);
     }
     
     /**
@@ -78,4 +87,34 @@ public final class MetaDataPersistService {
             schemaRuleService.persist(schemaName, 
schemaRuleConfigs.get(schemaName), isOverwrite);
         }
     }
+    
+    /**
+     * Persist instance configurations.
+     * 
+     * @param instanceId instance id
+     * @param labels collection of label
+     */
+    public void persistInstanceConfigurations(final String instanceId, final 
Collection<String> labels) {
+        computeNodePersistService.persistInstanceLabels(instanceId, labels);
+    }
+    
+    /**
+     * load compute node instances by labels.
+     * 
+     * @param labels collection of label
+     * @return collection of compute node instance
+     */
+    public Collection<ComputeNodeInstance> loadComputeNodeInstances(final 
Collection<String> labels) {
+        Collection<ComputeNodeInstance> result = 
computeNodePersistService.loadAllComputeNodeInstances();
+        if (!result.isEmpty()) {
+            final Collection<ShardingSphereUser> users = new ArrayList<>();
+            Optional<AuthorityRuleConfiguration> optional = 
globalRuleService.load().stream().filter(each -> each instanceof 
AuthorityRuleConfiguration)
+                    .map(each -> (AuthorityRuleConfiguration) 
each).findFirst();
+            if (optional.isPresent()) {
+                users.addAll(optional.get().getUsers());
+            }
+            result.forEach(each -> each.setUsers(users));
+        }
+        return result;
+    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
new file mode 100644
index 0000000..a2a197b
--- /dev/null
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.node;
+
+/**
+ * Compute node.
+ */
+public final class ComputeNode {
+    
+    private static final String ROOT_NODE = "nodes";
+    
+    private static final String COMPUTE_NODE = "compute_nodes";
+    
+    private static final String ONLINE_NODE = "online";
+    
+    private static final String ATTRIBUTES_NODE = "attributes";
+    
+    private static final String LABEL_NODE = "label";
+    
+    private static final String STATUS_NODE = "status";
+    
+    /**
+     * Get online compute node path.
+     * 
+     * @return path of online compute node
+     */
+    public static String getOnlineNodePath() {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ONLINE_NODE);
+    }
+    
+    /**
+     * Get online compute node instance path.
+     *
+     * @param instanceId instance id
+     * @return path of online compute node instance
+     */
+    public static String getOnlineInstanceNodePath(final String instanceId) {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ONLINE_NODE, 
instanceId);
+    }
+    
+    /**
+     * Get online compute node instance label path.
+     *
+     * @param instanceId instance id
+     * @return path of compute node instance label
+     */
+    public static String getInstanceLabelNodePath(final String instanceId) {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, ATTRIBUTES_NODE, 
instanceId, LABEL_NODE);
+    }
+}
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
new file mode 100644
index 0000000..09e2568
--- /dev/null
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/service/ComputeNodePersistService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.metadata.persist.service;
+
+import com.google.common.base.Splitter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.mode.persist.PersistRepository;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Compute node persist service.
+ */
+@RequiredArgsConstructor
+public final class ComputeNodePersistService {
+    
+    private final PersistRepository repository;
+    
+    /**
+     * Persist instance labels.
+     * 
+     * @param instanceId instance id
+     * @param labels collection of label
+     */
+    public void persistInstanceLabels(final String instanceId, final 
Collection<String> labels) {
+        repository.persist(ComputeNode.getInstanceLabelNodePath(instanceId), 
YamlEngine.marshal(labels));
+    }
+    
+    /**
+     * Load instance labels.
+     * 
+     * @param instanceId instance id
+     * @return collection of label
+     */
+    public Collection<String> loadInstanceLabels(final String instanceId) {
+        return 
YamlEngine.unmarshal(repository.get(ComputeNode.getInstanceLabelNodePath(instanceId)),
 Collection.class);
+    }
+    
+    /**
+     * Load all compute node instances.
+     * 
+     * @return collection of compute node instance
+     */
+    public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
+        Collection<String> onlineComputeNodes = 
repository.getChildrenKeys(ComputeNode.getOnlineNodePath());
+        List<ComputeNodeInstance> result = new 
ArrayList<>(onlineComputeNodes.size());
+        onlineComputeNodes.forEach(each -> {
+            ComputeNodeInstance instance = new ComputeNodeInstance();
+            instance.setIp(Splitter.on("@").splitToList(each).get(0));
+            instance.setPort(Splitter.on("@").splitToList(each).get(1));
+            instance.setLabels(loadInstanceLabels(each));
+            result.add(instance);
+        });
+        return result;
+    }
+}
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 6b3ab9b..e606307 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -22,6 +22,7 @@ import com.google.common.base.Strings;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
+import org.apache.shardingsphere.infra.instance.Instance;
 import org.apache.shardingsphere.infra.metadata.schema.QualifiedSchema;
 import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.schema.loader.SchemaLoader;
@@ -91,6 +92,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         
ModeScheduleContextFactory.getInstance().init(parameter.getModeConfig());
         metaDataPersistService = new MetaDataPersistService(repository);
         persistConfigurations(metaDataPersistService, 
parameter.getDataSourcesMap(), parameter.getSchemaRuleConfigs(), 
parameter.getGlobalRuleConfigs(), parameter.getProps(), 
parameter.isOverwrite());
+        persistInstanceConfigurations(parameter.getLabels());
         Collection<String> schemaNames = 
Strings.isNullOrEmpty(parameter.getSchemaName()) ? 
metaDataPersistService.getSchemaMetaDataService()
                 .loadAllNames() : 
Collections.singletonList(parameter.getSchemaName());
         Map<String, Map<String, DataSource>> clusterDataSources = 
loadDataSourcesMap(metaDataPersistService, parameter.getDataSourcesMap(), 
schemaNames);
@@ -125,6 +127,12 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         }
     }
     
+    private void persistInstanceConfigurations(final Collection<String> 
labels) {
+        if (null != labels && !labels.isEmpty()) {
+            
metaDataPersistService.persistInstanceConfigurations(Instance.getInstance().getId(),
 labels);
+        }
+    }
+    
     private boolean isEmptyLocalConfiguration(final Map<String, Map<String, 
DataSource>> dataSourcesMap,
                                               final Map<String, 
Collection<RuleConfiguration>> schemaRuleConfigs, final 
Collection<RuleConfiguration> globalRuleConfigs, final Properties props) {
         return isEmptyLocalDataSourcesMap(dataSourcesMap) && 
isEmptyLocalSchemaRuleConfigurations(schemaRuleConfigs) && 
globalRuleConfigs.isEmpty() && props.isEmpty();
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 26c283c..0a0a544 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -19,8 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.instance.Instance;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 /**
@@ -35,6 +34,6 @@ public final class ComputeNodeStatusService {
      * Register online.
      */
     public void registerOnline() {
-        
repository.persistEphemeral(ComputeStatusNode.getStatusPath(ComputeNodeStatus.ONLINE,
 Instance.getInstance().getId()), "");
+        
repository.persistEphemeral(ComputeNode.getOnlineInstanceNodePath(Instance.getInstance().getId()),
 "");
     }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServi
 [...]
index 3736c9f..4632329 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -41,6 +41,6 @@ public final class ComputeNodeStatusServiceTest {
     @Test
     public void assertRegisterOnline() {
         new ComputeNodeStatusService(repository).registerOnline();
-        verify(repository).persistEphemeral("/status/compute_nodes/online/" + 
Instance.getInstance().getId(), "");
+        verify(repository).persistEphemeral("/nodes/compute_nodes/online/" + 
Instance.getInstance().getId(), "");
     }
 }

Reply via email to