jiajunwang commented on a change in pull request #1307:
URL: https://github.com/apache/helix/pull/1307#discussion_r490644166



##########
File path: 
helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
##########
@@ -0,0 +1,248 @@
+package org.apache.helix.api.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTrie;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.TrieNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.model.ClusterTrie.CONNECTOR;
+import static org.apache.helix.model.ClusterTrie.DELIMITER;
+
+
+public class ClusterTopology {
+  private static Logger logger = 
LoggerFactory.getLogger(ClusterTopology.class);
+
+  private final ClusterTrie _trieClusterTopology;
+
+  public ClusterTopology(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {

Review comment:
       nit, final for the ClusterConfig too?

##########
File path: 
helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
##########
@@ -0,0 +1,248 @@
+package org.apache.helix.api.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTrie;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.TrieNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.model.ClusterTrie.CONNECTOR;
+import static org.apache.helix.model.ClusterTrie.DELIMITER;
+
+
+public class ClusterTopology {
+  private static Logger logger = 
LoggerFactory.getLogger(ClusterTopology.class);
+
+  private final ClusterTrie _trieClusterTopology;
+
+  public ClusterTopology(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    _trieClusterTopology = new ClusterTrie(liveNodes, instanceConfigMap, 
clusterConfig);
+  }
+
+  /**
+   * Return the whole topology of a cluster as a map. The key of the map is 
the first level of
+   * domain, and the value is a list of string that represents the path to 
each end node in that
+   * domain. E.g., assume the topology is defined as /group/zone/rack/host, 
the result may be {
+   * ["/group:0": {"/zone:0/rack:0/host:0", "/zone:1/rack:1/host:1"}], 
["/group:1": {"/zone:1
+   * /rack:1/host:1", "/zone:1/rack:1/host:2"}]}
+   */
+  public Map<String, List<String>> getTopologyMap() {
+    return getTopologyUnderDomain(new HashMap<>());

Review comment:
       nit, Collections.EMPTY_MAP?

##########
File path: helix-core/src/main/java/org/apache/helix/model/TrieNode.java
##########
@@ -0,0 +1,55 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+
+public class TrieNode {
+  // A mapping between trie key and children nodes.
+  private Map<String, TrieNode> _children;
+
+  // the complete path/prefix leading to the current node.
+  private final String _path;
+
+  private final String _domainType;

Review comment:
       For generic, change to _key (node key)?

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a class that uses a trie data structure to represent cluster 
topology. Each node
+ * except the terminal node represents a certain domain in the topology, and 
an terminal node
+ * represents an instance in the cluster.
+ */
+public class ClusterTrie {
+  public static final String DELIMITER = "/";
+  public static final String CONNECTOR = ":";
+
+  private static Logger logger = LoggerFactory.getLogger(ClusterTrie.class);
+  private TrieNode _rootNode;
+  private String[] _topologyKeys;
+  private String _faultZoneType;
+  private List<String> _invalidInstances = new ArrayList<>();
+
+  public ClusterTrie(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    validateInstanceConfig(liveNodes, instanceConfigMap);
+    validateClusterConfig(clusterConfig);
+    _faultZoneType = clusterConfig.getFaultZoneType();
+    _rootNode = constructTrie(instanceConfigMap);
+  }
+
+
+  public TrieNode getRootNode() {
+    return _rootNode;
+  }
+
+  public String[] getTopologyKeys() {
+    return _topologyKeys;
+  }
+
+  public  String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public List<String> getInvalidInstances() {
+    return _invalidInstances;
+  }
+
+  private void removeInvalidInstanceConfig(Map<String, InstanceConfig> 
instanceConfigMap) {
+    for (String instanceName : instanceConfigMap.keySet()) {
+      try {
+        Map<String, String> domainAsMap = 
instanceConfigMap.get(instanceName).getDomainAsMap();
+        for (String key : _topologyKeys) {
+          String value = domainAsMap.get(key);
+          if (value == null || value.length() == 0) {
+            logger.info(String.format("Domain %s for instance %s is not set", 
domainAsMap.get(key),
+                instanceName));
+            _invalidInstances.add(instanceName);
+            break;
+          }
+        }
+      } catch (IllegalArgumentException e) {
+        _invalidInstances.add(instanceName);
+      }
+    }
+    _invalidInstances.forEach(entry -> instanceConfigMap.remove(entry));
+  }
+
+  private void validateInstanceConfig(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap) {
+    if (instanceConfigMap == null || 
!instanceConfigMap.keySet().containsAll(liveNodes)) {
+      List<String> liveNodesCopy = new ArrayList<>();
+      liveNodesCopy.addAll(liveNodes);
+      throw new HelixException(String.format("Config for instances %s is not 
found!",
+          instanceConfigMap == null ? liveNodes
+              : liveNodesCopy.removeAll(instanceConfigMap.keySet())));
+    }
+  }
+
+  // Note that we do not validate whether topology-aware is enabled or fault 
zone type is
+  // defined, as they do not block the construction of the trie
+  private void validateClusterConfig(ClusterConfig clusterConfig) {
+    String topologyDef = clusterConfig.getTopology();
+    if (topologyDef == null) {
+      throw new HelixException(String.format("The topology of cluster %s is 
empty!",
+          clusterConfig.getClusterName()));
+    }
+    // A list of all keys in cluster topology, e.g., a cluster topology 
defined as
+    // /group/zone/rack/host will return ["group", "zone", "rack", "host"].
+    _topologyKeys = Arrays.asList(topologyDef.trim().split(DELIMITER)).stream()
+        .filter(str -> 
!str.isEmpty()).collect(Collectors.toList()).toArray(new String[0]);
+    if (_topologyKeys.length == 0) {
+      throw new HelixException(String.format("The topology of cluster %s is 
not correctly defined",
+          clusterConfig.getClusterName()));
+    }
+  }
+
+  /**
+   * Constructs a trie based on the provided instance config map. It loops 
through all instance
+   * configs and constructs the trie in a top down manner.
+   */
+  private TrieNode constructTrie(Map<String, InstanceConfig> 
instanceConfigMap) {
+    TrieNode rootNode = new TrieNode(new HashMap<>(), "", "ROOT");
+    removeInvalidInstanceConfig(instanceConfigMap);
+    Map<String, Map<String, String>> instanceDomainsMap = new HashMap<>();
+    instanceConfigMap.entrySet().forEach(
+        entry -> instanceDomainsMap.put(entry.getKey(), 
entry.getValue().getDomainAsMap()));
+
+    for (Map.Entry<String, Map<String, String>> entry : 
instanceDomainsMap.entrySet()) {
+      TrieNode curNode = rootNode;
+      String path = "";
+      for (int i = 0; i < _topologyKeys.length; i++) {
+        String key = _topologyKeys[i] + CONNECTOR + 
entry.getValue().get(_topologyKeys[i]);
+        path = path + DELIMITER + key;
+        TrieNode nextNode = curNode.getChildren().get(key);
+        if (nextNode == null) {
+          nextNode = new TrieNode(new HashMap<>(), path, _topologyKeys[i]);

Review comment:
       Seems the paths contain all the information starts from the root. Will 
it be too much duplicate information?
   Considering this object might be created frequently depends on the caller 
code, it could be better to just save the info about the current layer.

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a class that uses a trie data structure to represent cluster 
topology. Each node
+ * except the terminal node represents a certain domain in the topology, and 
an terminal node
+ * represents an instance in the cluster.
+ */
+public class ClusterTrie {
+  public static final String DELIMITER = "/";
+  public static final String CONNECTOR = ":";
+
+  private static Logger logger = LoggerFactory.getLogger(ClusterTrie.class);
+  private TrieNode _rootNode;
+  private String[] _topologyKeys;
+  private String _faultZoneType;
+  private List<String> _invalidInstances = new ArrayList<>();
+
+  public ClusterTrie(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    validateInstanceConfig(liveNodes, instanceConfigMap);
+    validateClusterConfig(clusterConfig);
+    _faultZoneType = clusterConfig.getFaultZoneType();
+    _rootNode = constructTrie(instanceConfigMap);

Review comment:
       So to summary the following comments, my suggestion would be,
   
   validateInstanceConfig(liveNodes, instanceConfigMap);
   _topologyKeys = getTopologyKeys(clusterConfig);
   _faultZoneType = clusterConfig.getFaultZoneType();
   _invalidInstances = getInvalidInstances(instanceConfigMap);
   instanceConfigMap.keyset().removeAll(_invalidInstances);
   _rootNode = constructTrie(instanceConfigMap, _topologyKeys);
   
   Would this be cleaner and easier to maintain for the long term?

##########
File path: 
helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
##########
@@ -0,0 +1,248 @@
+package org.apache.helix.api.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTrie;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.TrieNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.model.ClusterTrie.CONNECTOR;
+import static org.apache.helix.model.ClusterTrie.DELIMITER;
+
+
+public class ClusterTopology {
+  private static Logger logger = 
LoggerFactory.getLogger(ClusterTopology.class);
+
+  private final ClusterTrie _trieClusterTopology;
+
+  public ClusterTopology(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    _trieClusterTopology = new ClusterTrie(liveNodes, instanceConfigMap, 
clusterConfig);
+  }
+
+  /**
+   * Return the whole topology of a cluster as a map. The key of the map is 
the first level of
+   * domain, and the value is a list of string that represents the path to 
each end node in that
+   * domain. E.g., assume the topology is defined as /group/zone/rack/host, 
the result may be {
+   * ["/group:0": {"/zone:0/rack:0/host:0", "/zone:1/rack:1/host:1"}], 
["/group:1": {"/zone:1
+   * /rack:1/host:1", "/zone:1/rack:1/host:2"}]}
+   */
+  public Map<String, List<String>> getTopologyMap() {
+    return getTopologyUnderDomain(new HashMap<>());
+  }
+
+  /**
+   * Return all the instances under fault zone type. The key of the returned 
map is each fault
+   * zone name, and the value is a list of string that represents the path to 
each end node in
+   * that fault zone.
+   * @return , e.g. if the fault zone is "zone", it may return 
{["/group:0/zone:0": {"rack:0/host
+   * :0", "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  public Map<String, List<String>> getFaultZoneMap() {
+    String faultZone = _trieClusterTopology.getFaultZoneType();
+    if (faultZone == null) {
+      throw new IllegalArgumentException("The fault zone in cluster config is 
not defined");
+    }
+    return getTopologyUnderDomainType(faultZone);
+  }
+
+  /**
+   * Return the instances whose domain field is not valid
+   */
+  public List<String> getInvalidInstances() {
+    return _trieClusterTopology.getInvalidInstances();
+  }
+
+  /**
+   * Return the topology under a certain domain as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param domain A map defining the domain name and its value, e.g. 
{["group": "1"], ["zone",
+   *               "2"]}
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomain(Map<String, String> 
domain) {
+    LinkedHashMap<String, String> orderedDomain = 
validateAndOrderDomain(domain);
+    TrieNode startNode = getStartNode(orderedDomain);
+    Map<String, TrieNode> children = startNode.getChildren();
+    Map<String, List<String>> results = new HashMap<>();
+    children.entrySet().forEach(child -> {
+      results.put(startNode.getPath() + DELIMITER + child.getKey(),
+          truncatePath(getPathUnderNode(child.getValue()), 
child.getValue().getPath()));
+    });
+    return results;
+  }
+
+  /**
+   * Return the full topology of a certain domain type.
+   * @param domainType a specific type of domain, e.g. zone
+   * @return the topology of the given domain type, e.g. {["/group:0/zone:0": 
{"rack:0/host:0",
+   * "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomainType(String 
domainType) {
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    if (domainType.equals(topologyKeys[0])) {
+      return getTopologyMap();
+    }
+    Map<String, List<String>> results = new HashMap<>();
+    String parentDomainType = null;
+    for (int i = 1; i < topologyKeys.length; i++) {
+      if (topologyKeys[i].equals(domainType)) {
+        parentDomainType = topologyKeys[i - 1];
+        break;
+      }
+    }
+    // get all the starting nodes for the domain type
+    List<TrieNode> startNodes = getStartNodes(parentDomainType);
+    for (TrieNode startNode : startNodes) {
+      results.putAll(getTopologyUnderPath(startNode.getPath()));
+    }
+    return results;
+  }
+
+  /**
+   * Return the topology under a certain path as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param path a path to a certain Trie node, e.g. /group:1/zone:2
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderPath(String path) {
+    Map<String, String> domain = convertPathToDomain(path);
+    return getTopologyUnderDomain(domain);
+  }
+
+  /**
+   * Validate the domain provided has continuous fields in cluster topology 
definition. If it
+   * has, order the domain based on cluster topology definition. E.g. if the 
cluster topology is
+   * /group/zone/rack/instance, and domain is provided as {["zone": "1"], 
["group", "2"]} will be
+   * reordered in a LinkedinHashMap as {["group", "2"], ["zone": "1"]}
+   */
+  private LinkedHashMap<String, String> validateAndOrderDomain(Map<String, 
String> domain) {
+    LinkedHashMap<String, String> orderedDomain = new LinkedHashMap<>();
+    if (domain == null) {
+      throw new IllegalArgumentException("The domain should not be null");
+    }
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    for (int i = 0; i < domain.size(); i++) {
+      if (!domain.containsKey(topologyKeys[i])) {
+        throw new IllegalArgumentException(String
+            .format("The input domain is not valid, the key %s is required", 
topologyKeys[i]));
+      } else {
+        orderedDomain.put(topologyKeys[i], domain.get(topologyKeys[i]));
+      }
+    }
+    return orderedDomain;
+  }
+
+  /**
+   * Truncate each path in the given set and only retain path starting from 
current node's
+   * children to each end node.
+   * @param toRemovePath The path from root to current node. It should be 
removed so that users
+   *                     can get a better view.
+   */
+  private List<String> truncatePath(Set<String> paths, String toRemovePath) {
+    List<String> results = new ArrayList<>();
+    paths.forEach(path -> {
+      String truncatedPath = path.replace(toRemovePath, "");
+      results.add(truncatedPath);
+    });
+    return results;
+  }
+
+  /**
+   * Return all the paths from a TrieNode as a set.
+   * @param node the node from where to collect all the nodes' paths.
+   * @return All the paths under the node.
+   */
+  private Set<String> getPathUnderNode(TrieNode node) {
+    Set<String> resultMap = new HashSet<>();
+    Deque<TrieNode> nodeStack = new ArrayDeque<>();
+    nodeStack.push(node);
+    while (!nodeStack.isEmpty()) {
+      node = nodeStack.pop();
+      if (node.getChildren().isEmpty()) {
+        resultMap.add(node.getPath());
+      } else {
+        for (TrieNode child : node.getChildren().values()) {
+          nodeStack.push(child);
+        }
+      }
+    }
+    return resultMap;
+  }
+
+  private TrieNode getStartNode(LinkedHashMap<String, String> domain) {
+    TrieNode curNode = _trieClusterTopology.getRootNode();
+    TrieNode nextNode;
+    for (Map.Entry<String, String> entry : domain.entrySet()) {
+      nextNode = curNode.getChildren().get(entry.getKey() + CONNECTOR + 
entry.getValue());
+      if (nextNode == null) {
+        throw new IllegalArgumentException(String
+            .format("The input domain %s does not have the value %s", 
entry.getKey(),
+                entry.getValue()));
+      }
+      curNode = nextNode;
+    }
+    return curNode;
+  }
+
+  private List<TrieNode> getStartNodes(String domain) {
+    List<TrieNode> results = new ArrayList<>();
+    TrieNode curNode = _trieClusterTopology.getRootNode();
+    Deque<TrieNode> nodeStack = new ArrayDeque<>();
+    nodeStack.push(curNode);
+    while (!nodeStack.isEmpty()) {
+      curNode = nodeStack.pop();
+      if (curNode.getDomainType().equals(domain)) {
+        results.add(curNode);
+      } else {
+        for (TrieNode child : curNode.getChildren().values()) {
+          nodeStack.push(child);
+        }
+      }
+    }
+    return results;
+  }

Review comment:
       I feel these methods can be put into the ClusterTrie class, given we can 
generalize the domain concept to the trie node keys. But we can also refactor 
later when we are migrating our existing Topology.java to use the same class.

##########
File path: helix-core/src/main/java/org/apache/helix/model/TrieNode.java
##########
@@ -0,0 +1,55 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+
+public class TrieNode {
+  // A mapping between trie key and children nodes.
+  private Map<String, TrieNode> _children;
+
+  // the complete path/prefix leading to the current node.
+  private final String _path;
+
+  private final String _domainType;
+
+  TrieNode(Map<String, TrieNode> children, String path, String domainType) {

Review comment:
       nit, I'm not sure if it is feasible given all the usages, but we can 
always create new children map inside the constructor instead of allowing input.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
##########
@@ -1156,6 +1157,30 @@ public void removeCloudConfig(String clusterName) {
     accessor.removeProperty(keyBuilder.cloudConfig());
   }
 
+  @Override
+  public ClusterTopology getClusterTopology(String clusterName) {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    String path = PropertyPathBuilder.instanceConfig(clusterName);
+    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<>(_zkClient);
+    List<ZNRecord> znRecords = baseAccessor.getChildren(path, null, 0, 0, 0);
+    for (ZNRecord record : znRecords) {
+      if (record != null) {
+        InstanceConfig instanceConfig = new InstanceConfig(record);
+        instanceConfigMap.put(instanceConfig.getInstanceName(), 
instanceConfig);
+      }
+    }
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<LiveInstance> liveInstances = 
accessor.getChildValues(keyBuilder.liveInstances(), true);
+    List<String> liveNodes = new ArrayList<>();
+    liveInstances.forEach(liveInstance -> 
liveNodes.add(liveInstance.getInstanceName()));
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);

Review comment:
       nit, but it would be cleaner if we use ZkBaseDataAccessor for all 
information access, IMHO.

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
##########
@@ -1156,6 +1157,30 @@ public void removeCloudConfig(String clusterName) {
     accessor.removeProperty(keyBuilder.cloudConfig());
   }
 
+  @Override
+  public ClusterTopology getClusterTopology(String clusterName) {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    String path = PropertyPathBuilder.instanceConfig(clusterName);
+    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<>(_zkClient);
+    List<ZNRecord> znRecords = baseAccessor.getChildren(path, null, 0, 0, 0);
+    for (ZNRecord record : znRecords) {
+      if (record != null) {
+        InstanceConfig instanceConfig = new InstanceConfig(record);
+        instanceConfigMap.put(instanceConfig.getInstanceName(), 
instanceConfig);
+      }
+    }
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));

Review comment:
       Why we need 2 ZkBaseDataAccessors here?

##########
File path: 
helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
##########
@@ -0,0 +1,248 @@
+package org.apache.helix.api.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTrie;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.TrieNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.model.ClusterTrie.CONNECTOR;
+import static org.apache.helix.model.ClusterTrie.DELIMITER;
+
+
+public class ClusterTopology {
+  private static Logger logger = 
LoggerFactory.getLogger(ClusterTopology.class);
+
+  private final ClusterTrie _trieClusterTopology;
+
+  public ClusterTopology(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    _trieClusterTopology = new ClusterTrie(liveNodes, instanceConfigMap, 
clusterConfig);
+  }
+
+  /**
+   * Return the whole topology of a cluster as a map. The key of the map is 
the first level of
+   * domain, and the value is a list of string that represents the path to 
each end node in that
+   * domain. E.g., assume the topology is defined as /group/zone/rack/host, 
the result may be {
+   * ["/group:0": {"/zone:0/rack:0/host:0", "/zone:1/rack:1/host:1"}], 
["/group:1": {"/zone:1
+   * /rack:1/host:1", "/zone:1/rack:1/host:2"}]}
+   */
+  public Map<String, List<String>> getTopologyMap() {
+    return getTopologyUnderDomain(new HashMap<>());
+  }
+
+  /**
+   * Return all the instances under fault zone type. The key of the returned 
map is each fault
+   * zone name, and the value is a list of string that represents the path to 
each end node in
+   * that fault zone.
+   * @return , e.g. if the fault zone is "zone", it may return 
{["/group:0/zone:0": {"rack:0/host
+   * :0", "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  public Map<String, List<String>> getFaultZoneMap() {
+    String faultZone = _trieClusterTopology.getFaultZoneType();
+    if (faultZone == null) {
+      throw new IllegalArgumentException("The fault zone in cluster config is 
not defined");
+    }
+    return getTopologyUnderDomainType(faultZone);
+  }
+
+  /**
+   * Return the instances whose domain field is not valid
+   */
+  public List<String> getInvalidInstances() {
+    return _trieClusterTopology.getInvalidInstances();
+  }
+
+  /**
+   * Return the topology under a certain domain as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param domain A map defining the domain name and its value, e.g. 
{["group": "1"], ["zone",
+   *               "2"]}
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomain(Map<String, String> 
domain) {
+    LinkedHashMap<String, String> orderedDomain = 
validateAndOrderDomain(domain);
+    TrieNode startNode = getStartNode(orderedDomain);
+    Map<String, TrieNode> children = startNode.getChildren();
+    Map<String, List<String>> results = new HashMap<>();
+    children.entrySet().forEach(child -> {
+      results.put(startNode.getPath() + DELIMITER + child.getKey(),
+          truncatePath(getPathUnderNode(child.getValue()), 
child.getValue().getPath()));
+    });
+    return results;
+  }
+
+  /**
+   * Return the full topology of a certain domain type.
+   * @param domainType a specific type of domain, e.g. zone
+   * @return the topology of the given domain type, e.g. {["/group:0/zone:0": 
{"rack:0/host:0",
+   * "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomainType(String 
domainType) {
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    if (domainType.equals(topologyKeys[0])) {
+      return getTopologyMap();
+    }
+    Map<String, List<String>> results = new HashMap<>();
+    String parentDomainType = null;
+    for (int i = 1; i < topologyKeys.length; i++) {
+      if (topologyKeys[i].equals(domainType)) {
+        parentDomainType = topologyKeys[i - 1];
+        break;
+      }
+    }
+    // get all the starting nodes for the domain type
+    List<TrieNode> startNodes = getStartNodes(parentDomainType);
+    for (TrieNode startNode : startNodes) {
+      results.putAll(getTopologyUnderPath(startNode.getPath()));
+    }
+    return results;
+  }
+
+  /**
+   * Return the topology under a certain path as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param path a path to a certain Trie node, e.g. /group:1/zone:2
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderPath(String path) {
+    Map<String, String> domain = convertPathToDomain(path);
+    return getTopologyUnderDomain(domain);
+  }
+
+  /**
+   * Validate the domain provided has continuous fields in cluster topology 
definition. If it
+   * has, order the domain based on cluster topology definition. E.g. if the 
cluster topology is
+   * /group/zone/rack/instance, and domain is provided as {["zone": "1"], 
["group", "2"]} will be
+   * reordered in a LinkedinHashMap as {["group", "2"], ["zone": "1"]}
+   */
+  private LinkedHashMap<String, String> validateAndOrderDomain(Map<String, 
String> domain) {
+    LinkedHashMap<String, String> orderedDomain = new LinkedHashMap<>();
+    if (domain == null) {
+      throw new IllegalArgumentException("The domain should not be null");
+    }
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    for (int i = 0; i < domain.size(); i++) {
+      if (!domain.containsKey(topologyKeys[i])) {
+        throw new IllegalArgumentException(String
+            .format("The input domain is not valid, the key %s is required", 
topologyKeys[i]));
+      } else {
+        orderedDomain.put(topologyKeys[i], domain.get(topologyKeys[i]));
+      }
+    }
+    return orderedDomain;
+  }
+
+  /**
+   * Truncate each path in the given set and only retain path starting from 
current node's
+   * children to each end node.
+   * @param toRemovePath The path from root to current node. It should be 
removed so that users
+   *                     can get a better view.
+   */
+  private List<String> truncatePath(Set<String> paths, String toRemovePath) {
+    List<String> results = new ArrayList<>();
+    paths.forEach(path -> {
+      String truncatedPath = path.replace(toRemovePath, "");
+      results.add(truncatedPath);
+    });
+    return results;
+  }
+
+  /**
+   * Return all the paths from a TrieNode as a set.
+   * @param node the node from where to collect all the nodes' paths.
+   * @return All the paths under the node.
+   */
+  private Set<String> getPathUnderNode(TrieNode node) {

Review comment:
       Regardless of what I mentioned above, I think you can merge 
getPathUnderNode() and getStartNodes() to avoid duplicate logic. Some minor 
change is required, of course. But I think its a better option.
   
   High-level, you need a method that searches for multiple nodes under a 
certain scope. And another method that gets a certain node (getStartNode).

##########
File path: 
helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
##########
@@ -0,0 +1,248 @@
+package org.apache.helix.api.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTrie;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.TrieNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.model.ClusterTrie.CONNECTOR;
+import static org.apache.helix.model.ClusterTrie.DELIMITER;
+
+
+public class ClusterTopology {
+  private static Logger logger = 
LoggerFactory.getLogger(ClusterTopology.class);
+
+  private final ClusterTrie _trieClusterTopology;
+
+  public ClusterTopology(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    _trieClusterTopology = new ClusterTrie(liveNodes, instanceConfigMap, 
clusterConfig);
+  }
+
+  /**
+   * Return the whole topology of a cluster as a map. The key of the map is 
the first level of
+   * domain, and the value is a list of string that represents the path to 
each end node in that
+   * domain. E.g., assume the topology is defined as /group/zone/rack/host, 
the result may be {
+   * ["/group:0": {"/zone:0/rack:0/host:0", "/zone:1/rack:1/host:1"}], 
["/group:1": {"/zone:1
+   * /rack:1/host:1", "/zone:1/rack:1/host:2"}]}
+   */
+  public Map<String, List<String>> getTopologyMap() {
+    return getTopologyUnderDomain(new HashMap<>());
+  }
+
+  /**
+   * Return all the instances under fault zone type. The key of the returned 
map is each fault
+   * zone name, and the value is a list of string that represents the path to 
each end node in
+   * that fault zone.
+   * @return , e.g. if the fault zone is "zone", it may return 
{["/group:0/zone:0": {"rack:0/host
+   * :0", "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  public Map<String, List<String>> getFaultZoneMap() {
+    String faultZone = _trieClusterTopology.getFaultZoneType();
+    if (faultZone == null) {
+      throw new IllegalArgumentException("The fault zone in cluster config is 
not defined");
+    }
+    return getTopologyUnderDomainType(faultZone);
+  }
+
+  /**
+   * Return the instances whose domain field is not valid
+   */
+  public List<String> getInvalidInstances() {
+    return _trieClusterTopology.getInvalidInstances();
+  }
+
+  /**
+   * Return the topology under a certain domain as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param domain A map defining the domain name and its value, e.g. 
{["group": "1"], ["zone",
+   *               "2"]}
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomain(Map<String, String> 
domain) {

Review comment:
       We may not need these 2 functions for supporting the public methods.
   I think what we want is a query method that accepts a Map of domain type and 
value pairs. However, the trick is that the value could be a wildcard "*" or 
just put null. Then we just need to maintain one set of logic. And for sure, no 
duplicate logic.

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a class that uses a trie data structure to represent cluster 
topology. Each node
+ * except the terminal node represents a certain domain in the topology, and 
an terminal node
+ * represents an instance in the cluster.
+ */
+public class ClusterTrie {
+  public static final String DELIMITER = "/";
+  public static final String CONNECTOR = ":";
+
+  private static Logger logger = LoggerFactory.getLogger(ClusterTrie.class);
+  private TrieNode _rootNode;
+  private String[] _topologyKeys;
+  private String _faultZoneType;
+  private List<String> _invalidInstances = new ArrayList<>();
+
+  public ClusterTrie(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    validateInstanceConfig(liveNodes, instanceConfigMap);
+    validateClusterConfig(clusterConfig);
+    _faultZoneType = clusterConfig.getFaultZoneType();
+    _rootNode = constructTrie(instanceConfigMap);
+  }
+
+
+  public TrieNode getRootNode() {
+    return _rootNode;
+  }
+
+  public String[] getTopologyKeys() {
+    return _topologyKeys;
+  }
+
+  public  String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public List<String> getInvalidInstances() {
+    return _invalidInstances;
+  }
+
+  private void removeInvalidInstanceConfig(Map<String, InstanceConfig> 
instanceConfigMap) {

Review comment:
       Better to put _topologyKeys to the parameter. This is a better style 
since the private methods will not depend on each other.

##########
File path: 
helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
##########
@@ -0,0 +1,248 @@
+package org.apache.helix.api.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTrie;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.TrieNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.model.ClusterTrie.CONNECTOR;
+import static org.apache.helix.model.ClusterTrie.DELIMITER;
+
+
+public class ClusterTopology {
+  private static Logger logger = 
LoggerFactory.getLogger(ClusterTopology.class);
+
+  private final ClusterTrie _trieClusterTopology;
+
+  public ClusterTopology(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    _trieClusterTopology = new ClusterTrie(liveNodes, instanceConfigMap, 
clusterConfig);
+  }
+
+  /**
+   * Return the whole topology of a cluster as a map. The key of the map is 
the first level of
+   * domain, and the value is a list of string that represents the path to 
each end node in that
+   * domain. E.g., assume the topology is defined as /group/zone/rack/host, 
the result may be {
+   * ["/group:0": {"/zone:0/rack:0/host:0", "/zone:1/rack:1/host:1"}], 
["/group:1": {"/zone:1
+   * /rack:1/host:1", "/zone:1/rack:1/host:2"}]}
+   */
+  public Map<String, List<String>> getTopologyMap() {
+    return getTopologyUnderDomain(new HashMap<>());
+  }
+
+  /**
+   * Return all the instances under fault zone type. The key of the returned 
map is each fault
+   * zone name, and the value is a list of string that represents the path to 
each end node in
+   * that fault zone.
+   * @return , e.g. if the fault zone is "zone", it may return 
{["/group:0/zone:0": {"rack:0/host
+   * :0", "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  public Map<String, List<String>> getFaultZoneMap() {
+    String faultZone = _trieClusterTopology.getFaultZoneType();
+    if (faultZone == null) {
+      throw new IllegalArgumentException("The fault zone in cluster config is 
not defined");
+    }
+    return getTopologyUnderDomainType(faultZone);
+  }
+
+  /**
+   * Return the instances whose domain field is not valid
+   */
+  public List<String> getInvalidInstances() {
+    return _trieClusterTopology.getInvalidInstances();
+  }
+
+  /**
+   * Return the topology under a certain domain as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param domain A map defining the domain name and its value, e.g. 
{["group": "1"], ["zone",
+   *               "2"]}
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomain(Map<String, String> 
domain) {
+    LinkedHashMap<String, String> orderedDomain = 
validateAndOrderDomain(domain);
+    TrieNode startNode = getStartNode(orderedDomain);
+    Map<String, TrieNode> children = startNode.getChildren();
+    Map<String, List<String>> results = new HashMap<>();
+    children.entrySet().forEach(child -> {
+      results.put(startNode.getPath() + DELIMITER + child.getKey(),
+          truncatePath(getPathUnderNode(child.getValue()), 
child.getValue().getPath()));
+    });
+    return results;
+  }
+
+  /**
+   * Return the full topology of a certain domain type.
+   * @param domainType a specific type of domain, e.g. zone
+   * @return the topology of the given domain type, e.g. {["/group:0/zone:0": 
{"rack:0/host:0",
+   * "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomainType(String 
domainType) {
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    if (domainType.equals(topologyKeys[0])) {
+      return getTopologyMap();
+    }
+    Map<String, List<String>> results = new HashMap<>();
+    String parentDomainType = null;
+    for (int i = 1; i < topologyKeys.length; i++) {
+      if (topologyKeys[i].equals(domainType)) {
+        parentDomainType = topologyKeys[i - 1];
+        break;
+      }
+    }
+    // get all the starting nodes for the domain type
+    List<TrieNode> startNodes = getStartNodes(parentDomainType);
+    for (TrieNode startNode : startNodes) {
+      results.putAll(getTopologyUnderPath(startNode.getPath()));
+    }
+    return results;
+  }
+
+  /**
+   * Return the topology under a certain path as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param path a path to a certain Trie node, e.g. /group:1/zone:2
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderPath(String path) {
+    Map<String, String> domain = convertPathToDomain(path);
+    return getTopologyUnderDomain(domain);
+  }
+
+  /**
+   * Validate the domain provided has continuous fields in cluster topology 
definition. If it
+   * has, order the domain based on cluster topology definition. E.g. if the 
cluster topology is
+   * /group/zone/rack/instance, and domain is provided as {["zone": "1"], 
["group", "2"]} will be
+   * reordered in a LinkedinHashMap as {["group", "2"], ["zone": "1"]}
+   */
+  private LinkedHashMap<String, String> validateAndOrderDomain(Map<String, 
String> domain) {
+    LinkedHashMap<String, String> orderedDomain = new LinkedHashMap<>();
+    if (domain == null) {
+      throw new IllegalArgumentException("The domain should not be null");
+    }
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    for (int i = 0; i < domain.size(); i++) {
+      if (!domain.containsKey(topologyKeys[i])) {
+        throw new IllegalArgumentException(String
+            .format("The input domain is not valid, the key %s is required", 
topologyKeys[i]));
+      } else {
+        orderedDomain.put(topologyKeys[i], domain.get(topologyKeys[i]));
+      }
+    }
+    return orderedDomain;
+  }
+
+  /**
+   * Truncate each path in the given set and only retain path starting from 
current node's
+   * children to each end node.
+   * @param toRemovePath The path from root to current node. It should be 
removed so that users
+   *                     can get a better view.
+   */
+  private List<String> truncatePath(Set<String> paths, String toRemovePath) {
+    List<String> results = new ArrayList<>();
+    paths.forEach(path -> {
+      String truncatedPath = path.replace(toRemovePath, "");
+      results.add(truncatedPath);
+    });
+    return results;
+  }
+
+  /**
+   * Return all the paths from a TrieNode as a set.
+   * @param node the node from where to collect all the nodes' paths.
+   * @return All the paths under the node.
+   */
+  private Set<String> getPathUnderNode(TrieNode node) {
+    Set<String> resultMap = new HashSet<>();
+    Deque<TrieNode> nodeStack = new ArrayDeque<>();
+    nodeStack.push(node);
+    while (!nodeStack.isEmpty()) {
+      node = nodeStack.pop();
+      if (node.getChildren().isEmpty()) {
+        resultMap.add(node.getPath());
+      } else {
+        for (TrieNode child : node.getChildren().values()) {
+          nodeStack.push(child);
+        }
+      }
+    }
+    return resultMap;
+  }
+
+  private TrieNode getStartNode(LinkedHashMap<String, String> domain) {

Review comment:
       nit, domain -> domainMap

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a class that uses a trie data structure to represent cluster 
topology. Each node
+ * except the terminal node represents a certain domain in the topology, and 
an terminal node
+ * represents an instance in the cluster.
+ */
+public class ClusterTrie {
+  public static final String DELIMITER = "/";
+  public static final String CONNECTOR = ":";
+
+  private static Logger logger = LoggerFactory.getLogger(ClusterTrie.class);
+  private TrieNode _rootNode;
+  private String[] _topologyKeys;
+  private String _faultZoneType;
+  private List<String> _invalidInstances = new ArrayList<>();
+
+  public ClusterTrie(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    validateInstanceConfig(liveNodes, instanceConfigMap);
+    validateClusterConfig(clusterConfig);
+    _faultZoneType = clusterConfig.getFaultZoneType();
+    _rootNode = constructTrie(instanceConfigMap);
+  }
+
+
+  public TrieNode getRootNode() {
+    return _rootNode;
+  }
+
+  public String[] getTopologyKeys() {
+    return _topologyKeys;
+  }
+
+  public  String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public List<String> getInvalidInstances() {
+    return _invalidInstances;
+  }
+
+  private void removeInvalidInstanceConfig(Map<String, InstanceConfig> 
instanceConfigMap) {

Review comment:
       In general, private methods are better to be "independent" with the 
object. So that maintenance of the class would be much easier.
   
   Suggest to change to
   private List<String> getInvalidIstanceConfig(Map<String, InstanceConfig> 
instanceConfigMap);
   
   Then we can do instanceConfigMap.keyset().removeAll(_invalidInstances) in 
the caller where the object fields are updated in a centralized place.

##########
File path: 
helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
##########
@@ -0,0 +1,248 @@
+package org.apache.helix.api.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTrie;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.TrieNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.model.ClusterTrie.CONNECTOR;
+import static org.apache.helix.model.ClusterTrie.DELIMITER;
+
+
+public class ClusterTopology {
+  private static Logger logger = 
LoggerFactory.getLogger(ClusterTopology.class);
+
+  private final ClusterTrie _trieClusterTopology;
+
+  public ClusterTopology(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    _trieClusterTopology = new ClusterTrie(liveNodes, instanceConfigMap, 
clusterConfig);
+  }
+
+  /**
+   * Return the whole topology of a cluster as a map. The key of the map is 
the first level of
+   * domain, and the value is a list of string that represents the path to 
each end node in that
+   * domain. E.g., assume the topology is defined as /group/zone/rack/host, 
the result may be {
+   * ["/group:0": {"/zone:0/rack:0/host:0", "/zone:1/rack:1/host:1"}], 
["/group:1": {"/zone:1
+   * /rack:1/host:1", "/zone:1/rack:1/host:2"}]}
+   */
+  public Map<String, List<String>> getTopologyMap() {
+    return getTopologyUnderDomain(new HashMap<>());
+  }
+
+  /**
+   * Return all the instances under fault zone type. The key of the returned 
map is each fault
+   * zone name, and the value is a list of string that represents the path to 
each end node in
+   * that fault zone.
+   * @return , e.g. if the fault zone is "zone", it may return 
{["/group:0/zone:0": {"rack:0/host
+   * :0", "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  public Map<String, List<String>> getFaultZoneMap() {
+    String faultZone = _trieClusterTopology.getFaultZoneType();
+    if (faultZone == null) {
+      throw new IllegalArgumentException("The fault zone in cluster config is 
not defined");
+    }
+    return getTopologyUnderDomainType(faultZone);
+  }
+
+  /**
+   * Return the instances whose domain field is not valid
+   */
+  public List<String> getInvalidInstances() {
+    return _trieClusterTopology.getInvalidInstances();
+  }
+
+  /**
+   * Return the topology under a certain domain as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param domain A map defining the domain name and its value, e.g. 
{["group": "1"], ["zone",
+   *               "2"]}
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomain(Map<String, String> 
domain) {
+    LinkedHashMap<String, String> orderedDomain = 
validateAndOrderDomain(domain);
+    TrieNode startNode = getStartNode(orderedDomain);
+    Map<String, TrieNode> children = startNode.getChildren();
+    Map<String, List<String>> results = new HashMap<>();
+    children.entrySet().forEach(child -> {
+      results.put(startNode.getPath() + DELIMITER + child.getKey(),
+          truncatePath(getPathUnderNode(child.getValue()), 
child.getValue().getPath()));
+    });
+    return results;
+  }
+
+  /**
+   * Return the full topology of a certain domain type.
+   * @param domainType a specific type of domain, e.g. zone
+   * @return the topology of the given domain type, e.g. {["/group:0/zone:0": 
{"rack:0/host:0",
+   * "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomainType(String 
domainType) {
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    if (domainType.equals(topologyKeys[0])) {
+      return getTopologyMap();
+    }
+    Map<String, List<String>> results = new HashMap<>();
+    String parentDomainType = null;
+    for (int i = 1; i < topologyKeys.length; i++) {
+      if (topologyKeys[i].equals(domainType)) {
+        parentDomainType = topologyKeys[i - 1];
+        break;
+      }
+    }
+    // get all the starting nodes for the domain type
+    List<TrieNode> startNodes = getStartNodes(parentDomainType);
+    for (TrieNode startNode : startNodes) {
+      results.putAll(getTopologyUnderPath(startNode.getPath()));
+    }
+    return results;
+  }
+
+  /**
+   * Return the topology under a certain path as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param path a path to a certain Trie node, e.g. /group:1/zone:2
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderPath(String path) {
+    Map<String, String> domain = convertPathToDomain(path);
+    return getTopologyUnderDomain(domain);
+  }
+
+  /**
+   * Validate the domain provided has continuous fields in cluster topology 
definition. If it
+   * has, order the domain based on cluster topology definition. E.g. if the 
cluster topology is
+   * /group/zone/rack/instance, and domain is provided as {["zone": "1"], 
["group", "2"]} will be
+   * reordered in a LinkedinHashMap as {["group", "2"], ["zone": "1"]}
+   */
+  private LinkedHashMap<String, String> validateAndOrderDomain(Map<String, 
String> domain) {
+    LinkedHashMap<String, String> orderedDomain = new LinkedHashMap<>();
+    if (domain == null) {
+      throw new IllegalArgumentException("The domain should not be null");
+    }
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    for (int i = 0; i < domain.size(); i++) {
+      if (!domain.containsKey(topologyKeys[i])) {
+        throw new IllegalArgumentException(String
+            .format("The input domain is not valid, the key %s is required", 
topologyKeys[i]));
+      } else {
+        orderedDomain.put(topologyKeys[i], domain.get(topologyKeys[i]));
+      }
+    }
+    return orderedDomain;
+  }
+
+  /**
+   * Truncate each path in the given set and only retain path starting from 
current node's
+   * children to each end node.
+   * @param toRemovePath The path from root to current node. It should be 
removed so that users
+   *                     can get a better view.
+   */
+  private List<String> truncatePath(Set<String> paths, String toRemovePath) {
+    List<String> results = new ArrayList<>();
+    paths.forEach(path -> {
+      String truncatedPath = path.replace(toRemovePath, "");
+      results.add(truncatedPath);
+    });
+    return results;
+  }
+
+  /**
+   * Return all the paths from a TrieNode as a set.
+   * @param node the node from where to collect all the nodes' paths.
+   * @return All the paths under the node.
+   */
+  private Set<String> getPathUnderNode(TrieNode node) {
+    Set<String> resultMap = new HashSet<>();
+    Deque<TrieNode> nodeStack = new ArrayDeque<>();
+    nodeStack.push(node);
+    while (!nodeStack.isEmpty()) {
+      node = nodeStack.pop();
+      if (node.getChildren().isEmpty()) {
+        resultMap.add(node.getPath());
+      } else {
+        for (TrieNode child : node.getChildren().values()) {
+          nodeStack.push(child);
+        }
+      }
+    }
+    return resultMap;
+  }
+
+  private TrieNode getStartNode(LinkedHashMap<String, String> domain) {
+    TrieNode curNode = _trieClusterTopology.getRootNode();
+    TrieNode nextNode;
+    for (Map.Entry<String, String> entry : domain.entrySet()) {
+      nextNode = curNode.getChildren().get(entry.getKey() + CONNECTOR + 
entry.getValue());
+      if (nextNode == null) {
+        throw new IllegalArgumentException(String
+            .format("The input domain %s does not have the value %s", 
entry.getKey(),
+                entry.getValue()));
+      }
+      curNode = nextNode;
+    }
+    return curNode;
+  }
+
+  private List<TrieNode> getStartNodes(String domain) {

Review comment:
       nit, domain -> domainType

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a class that uses a trie data structure to represent cluster 
topology. Each node
+ * except the terminal node represents a certain domain in the topology, and 
an terminal node
+ * represents an instance in the cluster.
+ */
+public class ClusterTrie {
+  public static final String DELIMITER = "/";
+  public static final String CONNECTOR = ":";
+
+  private static Logger logger = LoggerFactory.getLogger(ClusterTrie.class);
+  private TrieNode _rootNode;
+  private String[] _topologyKeys;
+  private String _faultZoneType;
+  private List<String> _invalidInstances = new ArrayList<>();
+
+  public ClusterTrie(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    validateInstanceConfig(liveNodes, instanceConfigMap);
+    validateClusterConfig(clusterConfig);
+    _faultZoneType = clusterConfig.getFaultZoneType();
+    _rootNode = constructTrie(instanceConfigMap);
+  }
+
+
+  public TrieNode getRootNode() {
+    return _rootNode;
+  }
+
+  public String[] getTopologyKeys() {
+    return _topologyKeys;
+  }
+
+  public  String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public List<String> getInvalidInstances() {
+    return _invalidInstances;
+  }
+
+  private void removeInvalidInstanceConfig(Map<String, InstanceConfig> 
instanceConfigMap) {
+    for (String instanceName : instanceConfigMap.keySet()) {
+      try {
+        Map<String, String> domainAsMap = 
instanceConfigMap.get(instanceName).getDomainAsMap();
+        for (String key : _topologyKeys) {
+          String value = domainAsMap.get(key);
+          if (value == null || value.length() == 0) {
+            logger.info(String.format("Domain %s for instance %s is not set", 
domainAsMap.get(key),
+                instanceName));
+            _invalidInstances.add(instanceName);
+            break;
+          }
+        }
+      } catch (IllegalArgumentException e) {
+        _invalidInstances.add(instanceName);
+      }
+    }
+    _invalidInstances.forEach(entry -> instanceConfigMap.remove(entry));
+  }
+
+  private void validateInstanceConfig(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap) {
+    if (instanceConfigMap == null || 
!instanceConfigMap.keySet().containsAll(liveNodes)) {
+      List<String> liveNodesCopy = new ArrayList<>();
+      liveNodesCopy.addAll(liveNodes);
+      throw new HelixException(String.format("Config for instances %s is not 
found!",
+          instanceConfigMap == null ? liveNodes
+              : liveNodesCopy.removeAll(instanceConfigMap.keySet())));
+    }
+  }
+
+  // Note that we do not validate whether topology-aware is enabled or fault 
zone type is
+  // defined, as they do not block the construction of the trie
+  private void validateClusterConfig(ClusterConfig clusterConfig) {
+    String topologyDef = clusterConfig.getTopology();
+    if (topologyDef == null) {
+      throw new HelixException(String.format("The topology of cluster %s is 
empty!",
+          clusterConfig.getClusterName()));
+    }
+    // A list of all keys in cluster topology, e.g., a cluster topology 
defined as
+    // /group/zone/rack/host will return ["group", "zone", "rack", "host"].
+    _topologyKeys = Arrays.asList(topologyDef.trim().split(DELIMITER)).stream()
+        .filter(str -> 
!str.isEmpty()).collect(Collectors.toList()).toArray(new String[0]);
+    if (_topologyKeys.length == 0) {
+      throw new HelixException(String.format("The topology of cluster %s is 
not correctly defined",
+          clusterConfig.getClusterName()));
+    }
+  }
+
+  /**
+   * Constructs a trie based on the provided instance config map. It loops 
through all instance
+   * configs and constructs the trie in a top down manner.
+   */
+  private TrieNode constructTrie(Map<String, InstanceConfig> 
instanceConfigMap) {

Review comment:
       Same here, better to put the _topologyKeys into the parameter list.

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a class that uses a trie data structure to represent cluster 
topology. Each node
+ * except the terminal node represents a certain domain in the topology, and 
an terminal node
+ * represents an instance in the cluster.
+ */
+public class ClusterTrie {
+  public static final String DELIMITER = "/";
+  public static final String CONNECTOR = ":";
+
+  private static Logger logger = LoggerFactory.getLogger(ClusterTrie.class);
+  private TrieNode _rootNode;
+  private String[] _topologyKeys;
+  private String _faultZoneType;
+  private List<String> _invalidInstances = new ArrayList<>();
+
+  public ClusterTrie(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    validateInstanceConfig(liveNodes, instanceConfigMap);
+    validateClusterConfig(clusterConfig);
+    _faultZoneType = clusterConfig.getFaultZoneType();
+    _rootNode = constructTrie(instanceConfigMap);
+  }
+
+
+  public TrieNode getRootNode() {
+    return _rootNode;
+  }
+
+  public String[] getTopologyKeys() {
+    return _topologyKeys;
+  }
+
+  public  String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public List<String> getInvalidInstances() {
+    return _invalidInstances;
+  }
+
+  private void removeInvalidInstanceConfig(Map<String, InstanceConfig> 
instanceConfigMap) {
+    for (String instanceName : instanceConfigMap.keySet()) {
+      try {
+        Map<String, String> domainAsMap = 
instanceConfigMap.get(instanceName).getDomainAsMap();
+        for (String key : _topologyKeys) {
+          String value = domainAsMap.get(key);
+          if (value == null || value.length() == 0) {
+            logger.info(String.format("Domain %s for instance %s is not set", 
domainAsMap.get(key),
+                instanceName));
+            _invalidInstances.add(instanceName);
+            break;
+          }
+        }
+      } catch (IllegalArgumentException e) {
+        _invalidInstances.add(instanceName);
+      }
+    }
+    _invalidInstances.forEach(entry -> instanceConfigMap.remove(entry));

Review comment:
       instanceConfigMap.keyset().removeAll(_invalidInstances)

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterTrie.java
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a class that uses a trie data structure to represent cluster 
topology. Each node
+ * except the terminal node represents a certain domain in the topology, and 
an terminal node
+ * represents an instance in the cluster.
+ */
+public class ClusterTrie {
+  public static final String DELIMITER = "/";
+  public static final String CONNECTOR = ":";
+
+  private static Logger logger = LoggerFactory.getLogger(ClusterTrie.class);
+  private TrieNode _rootNode;
+  private String[] _topologyKeys;
+  private String _faultZoneType;
+  private List<String> _invalidInstances = new ArrayList<>();
+
+  public ClusterTrie(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    validateInstanceConfig(liveNodes, instanceConfigMap);
+    validateClusterConfig(clusterConfig);
+    _faultZoneType = clusterConfig.getFaultZoneType();
+    _rootNode = constructTrie(instanceConfigMap);
+  }
+
+
+  public TrieNode getRootNode() {
+    return _rootNode;
+  }
+
+  public String[] getTopologyKeys() {
+    return _topologyKeys;
+  }
+
+  public  String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public List<String> getInvalidInstances() {
+    return _invalidInstances;
+  }
+
+  private void removeInvalidInstanceConfig(Map<String, InstanceConfig> 
instanceConfigMap) {
+    for (String instanceName : instanceConfigMap.keySet()) {
+      try {
+        Map<String, String> domainAsMap = 
instanceConfigMap.get(instanceName).getDomainAsMap();
+        for (String key : _topologyKeys) {
+          String value = domainAsMap.get(key);
+          if (value == null || value.length() == 0) {
+            logger.info(String.format("Domain %s for instance %s is not set", 
domainAsMap.get(key),
+                instanceName));
+            _invalidInstances.add(instanceName);
+            break;
+          }
+        }
+      } catch (IllegalArgumentException e) {
+        _invalidInstances.add(instanceName);
+      }
+    }
+    _invalidInstances.forEach(entry -> instanceConfigMap.remove(entry));
+  }
+
+  private void validateInstanceConfig(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap) {
+    if (instanceConfigMap == null || 
!instanceConfigMap.keySet().containsAll(liveNodes)) {
+      List<String> liveNodesCopy = new ArrayList<>();
+      liveNodesCopy.addAll(liveNodes);
+      throw new HelixException(String.format("Config for instances %s is not 
found!",
+          instanceConfigMap == null ? liveNodes
+              : liveNodesCopy.removeAll(instanceConfigMap.keySet())));
+    }
+  }
+
+  // Note that we do not validate whether topology-aware is enabled or fault 
zone type is
+  // defined, as they do not block the construction of the trie
+  private void validateClusterConfig(ClusterConfig clusterConfig) {

Review comment:
       nit, just return the _topologyKeys as the return value. So we don't 
modify the object field secretly inside a "validate" method. Moreover, maybe 
call it getTopologyDef() would be more straightforward.

##########
File path: 
helix-core/src/main/java/org/apache/helix/api/topology/ClusterTopology.java
##########
@@ -0,0 +1,248 @@
+package org.apache.helix.api.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ClusterTrie;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.TrieNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.model.ClusterTrie.CONNECTOR;
+import static org.apache.helix.model.ClusterTrie.DELIMITER;
+
+
+public class ClusterTopology {
+  private static Logger logger = 
LoggerFactory.getLogger(ClusterTopology.class);
+
+  private final ClusterTrie _trieClusterTopology;
+
+  public ClusterTopology(final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    _trieClusterTopology = new ClusterTrie(liveNodes, instanceConfigMap, 
clusterConfig);
+  }
+
+  /**
+   * Return the whole topology of a cluster as a map. The key of the map is 
the first level of
+   * domain, and the value is a list of string that represents the path to 
each end node in that
+   * domain. E.g., assume the topology is defined as /group/zone/rack/host, 
the result may be {
+   * ["/group:0": {"/zone:0/rack:0/host:0", "/zone:1/rack:1/host:1"}], 
["/group:1": {"/zone:1
+   * /rack:1/host:1", "/zone:1/rack:1/host:2"}]}
+   */
+  public Map<String, List<String>> getTopologyMap() {
+    return getTopologyUnderDomain(new HashMap<>());
+  }
+
+  /**
+   * Return all the instances under fault zone type. The key of the returned 
map is each fault
+   * zone name, and the value is a list of string that represents the path to 
each end node in
+   * that fault zone.
+   * @return , e.g. if the fault zone is "zone", it may return 
{["/group:0/zone:0": {"rack:0/host
+   * :0", "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  public Map<String, List<String>> getFaultZoneMap() {
+    String faultZone = _trieClusterTopology.getFaultZoneType();
+    if (faultZone == null) {
+      throw new IllegalArgumentException("The fault zone in cluster config is 
not defined");
+    }
+    return getTopologyUnderDomainType(faultZone);
+  }
+
+  /**
+   * Return the instances whose domain field is not valid
+   */
+  public List<String> getInvalidInstances() {
+    return _trieClusterTopology.getInvalidInstances();
+  }
+
+  /**
+   * Return the topology under a certain domain as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param domain A map defining the domain name and its value, e.g. 
{["group": "1"], ["zone",
+   *               "2"]}
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomain(Map<String, String> 
domain) {
+    LinkedHashMap<String, String> orderedDomain = 
validateAndOrderDomain(domain);
+    TrieNode startNode = getStartNode(orderedDomain);
+    Map<String, TrieNode> children = startNode.getChildren();
+    Map<String, List<String>> results = new HashMap<>();
+    children.entrySet().forEach(child -> {
+      results.put(startNode.getPath() + DELIMITER + child.getKey(),
+          truncatePath(getPathUnderNode(child.getValue()), 
child.getValue().getPath()));
+    });
+    return results;
+  }
+
+  /**
+   * Return the full topology of a certain domain type.
+   * @param domainType a specific type of domain, e.g. zone
+   * @return the topology of the given domain type, e.g. {["/group:0/zone:0": 
{"rack:0/host:0",
+   * "rack:1/host:1"}, ["/group:0/zone:1": {"/rack:0:host:2", 
"/rack:1/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderDomainType(String 
domainType) {
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    if (domainType.equals(topologyKeys[0])) {
+      return getTopologyMap();
+    }
+    Map<String, List<String>> results = new HashMap<>();
+    String parentDomainType = null;
+    for (int i = 1; i < topologyKeys.length; i++) {
+      if (topologyKeys[i].equals(domainType)) {
+        parentDomainType = topologyKeys[i - 1];
+        break;
+      }
+    }
+    // get all the starting nodes for the domain type
+    List<TrieNode> startNodes = getStartNodes(parentDomainType);
+    for (TrieNode startNode : startNodes) {
+      results.putAll(getTopologyUnderPath(startNode.getPath()));
+    }
+    return results;
+  }
+
+  /**
+   * Return the topology under a certain path as a map. The key of the 
returned map is the next
+   * level domain, and the value is a list of string that represents the path 
to each end node in
+   * that domain.
+   * @param path a path to a certain Trie node, e.g. /group:1/zone:2
+   * @return the topology under the given domain, e.g. 
{["/group:1/zone:2/rack:0": {"/host:0",
+   * "/host:1"}, ["/group:1/zone:2/rack:1": {"/host:2", "/host:3"}]}
+   */
+  private Map<String, List<String>> getTopologyUnderPath(String path) {
+    Map<String, String> domain = convertPathToDomain(path);
+    return getTopologyUnderDomain(domain);
+  }
+
+  /**
+   * Validate the domain provided has continuous fields in cluster topology 
definition. If it
+   * has, order the domain based on cluster topology definition. E.g. if the 
cluster topology is
+   * /group/zone/rack/instance, and domain is provided as {["zone": "1"], 
["group", "2"]} will be
+   * reordered in a LinkedinHashMap as {["group", "2"], ["zone": "1"]}
+   */
+  private LinkedHashMap<String, String> validateAndOrderDomain(Map<String, 
String> domain) {
+    LinkedHashMap<String, String> orderedDomain = new LinkedHashMap<>();
+    if (domain == null) {
+      throw new IllegalArgumentException("The domain should not be null");
+    }
+    String[] topologyKeys = _trieClusterTopology.getTopologyKeys();
+    for (int i = 0; i < domain.size(); i++) {
+      if (!domain.containsKey(topologyKeys[i])) {
+        throw new IllegalArgumentException(String
+            .format("The input domain is not valid, the key %s is required", 
topologyKeys[i]));
+      } else {
+        orderedDomain.put(topologyKeys[i], domain.get(topologyKeys[i]));
+      }
+    }
+    return orderedDomain;
+  }
+
+  /**
+   * Truncate each path in the given set and only retain path starting from 
current node's
+   * children to each end node.
+   * @param toRemovePath The path from root to current node. It should be 
removed so that users
+   *                     can get a better view.
+   */
+  private List<String> truncatePath(Set<String> paths, String toRemovePath) {
+    List<String> results = new ArrayList<>();
+    paths.forEach(path -> {
+      String truncatedPath = path.replace(toRemovePath, "");
+      results.add(truncatedPath);
+    });
+    return results;
+  }
+
+  /**
+   * Return all the paths from a TrieNode as a set.
+   * @param node the node from where to collect all the nodes' paths.
+   * @return All the paths under the node.
+   */
+  private Set<String> getPathUnderNode(TrieNode node) {
+    Set<String> resultMap = new HashSet<>();
+    Deque<TrieNode> nodeStack = new ArrayDeque<>();
+    nodeStack.push(node);
+    while (!nodeStack.isEmpty()) {
+      node = nodeStack.pop();
+      if (node.getChildren().isEmpty()) {
+        resultMap.add(node.getPath());
+      } else {
+        for (TrieNode child : node.getChildren().values()) {
+          nodeStack.push(child);
+        }
+      }
+    }
+    return resultMap;
+  }
+
+  private TrieNode getStartNode(LinkedHashMap<String, String> domain) {

Review comment:
       And, just curious, where does the "start" come from? The leaf node is a 
special case. But here the logic is generic, so it is just getNode(), right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to