OneSizeFitsQuorum commented on code in PR #15014:
URL: https://github.com/apache/iotdb/pull/15014#discussion_r1990505238


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java:
##########
@@ -769,6 +776,22 @@ public void waitForLeaderElection(List<TConsensusGroupId> 
regionGroupIds) {
         regionGroupIds);
   }
 
+  public void updateTopology(Map<Integer, Set<Integer>> latestTopology) {
+    if (!latestTopology.equals(topologyGraph)) {
+      LOGGER.info("[Topology Service] Cluster topology changed, latest: {}", 
latestTopology);
+      topologyGraph = latestTopology;
+      topologyChanged.set(true);
+    }
+  }
+
+  @Nullable
+  public Map<Integer, Set<Integer>> getTopology() {

Review Comment:
   Here are some things to consider:
   1. How to update topology after this heartbeat failure
   2. How to update topology with node restart
   
   There are a lot of corner cases to deal with, my advice is to maintain a 
version of topology, and as long as the dn heartbeat returns a different 
version, keep sending them until they are the same.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java:
##########
@@ -161,6 +162,13 @@ private TDataNodeHeartbeatReq genHeartbeatReq() {
       
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
     }
 
+    final Map<Integer, Set<Integer>> topologyMap =
+        configManager.getLoadManager().getLoadCache().getTopology();
+    if (topologyMap != null) {

Review Comment:
   consider above things



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = 1_000L;
+  private final ScheduledExecutorService topologyExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+  private final IManager configManager;
+  private ScheduledFuture<?> future;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                60,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public void startTopologyService() {
+    future =
+        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+            topologyExecutor, this::topologyProbing, 0, PROBING_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public void stopTopologyService() {

Review Comment:
   same as above



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = 1_000L;
+  private final ScheduledExecutorService topologyExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+  private final IManager configManager;
+  private ScheduledFuture<?> future;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                60,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public void startTopologyService() {
+    future =
+        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+            topologyExecutor, this::topologyProbing, 0, PROBING_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public void stopTopologyService() {
+    future.cancel(true);
+    future = null;
+    LOGGER.info("Topology Probing has stopped successfully");
+  }
+
+  private void topologyProbing() {
+    // 1. get the latest datanode list
+    final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    final Set<Integer> dataNodeIds = new HashSet<>();
+    final Map<TEndPoint, Integer> endPoint2IdMap = new HashMap<>();
+    for (final TDataNodeConfiguration dataNodeConf :
+        configManager.getNodeManager().getRegisteredDataNodes()) {
+      final TDataNodeLocation location = dataNodeConf.getLocation();
+      dataNodeLocations.add(location);
+      dataNodeIds.add(location.getDataNodeId());
+      endPoint2IdMap.put(location.getInternalEndPoint(), 
location.getDataNodeId());
+    }
+
+    // 2. send the verify connection RPC to all datanode
+    final TNodeLocations nodeLocations = new TNodeLocations();
+    nodeLocations.setDataNodeLocations(dataNodeLocations);
+    nodeLocations.setConfigNodeLocations(Collections.emptyList());
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodes().stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, 
location -> location));
+    final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
+        dataNodeAsyncRequestContext =
+            new DataNodeAsyncRequestContext<>(
+                CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
+                nodeLocations,
+                dataNodeLocationMap);
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);
+    final List<TTestConnectionResult> results = new ArrayList<>();
+    dataNodeAsyncRequestContext
+        .getResponseMap()
+        .forEach(
+            (nodeId, resp) -> {
+              if (resp.isSetResultList()) {
+                results.addAll(resp.getResultList());
+              }
+            });
+
+    // 3. collect results and update the heartbeat timestamps
+    for (final TTestConnectionResult result : results) {
+      final int fromDataNodeId =
+          Optional.ofNullable(result.getSender().getDataNodeLocation())
+              .map(TDataNodeLocation::getDataNodeId)
+              .orElse(-1);
+      final int toDataNodeId =
+          
Optional.ofNullable(endPoint2IdMap.get(result.getServiceProvider().getEndPoint()))
+              .orElse(-1);
+      if (result.isSuccess()
+          && dataNodeIds.contains(fromDataNodeId)
+          && dataNodeIds.contains(toDataNodeId)) {
+        // testAllDataNodeConnectionWithTimeout ensures the heartbeats are 
Dn-Dn internally. Here we
+        // just double-check.
+        heartbeats
+            .computeIfAbsent(new Pair<>(fromDataNodeId, toDataNodeId), p -> 
new ArrayList<>())
+            .add(new NodeHeartbeatSample(NodeStatus.Running));
+      }
+    }
+
+    // 4. use failure detector to identify potential network partitions
+    boolean partitionDetected = false;
+    final Map<Integer, Set<Integer>> latestTopology =
+        dataNodeLocations.stream()
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> 
new HashSet<>()));
+    for (final Map.Entry<Pair<Integer, Integer>, 
List<AbstractHeartbeatSample>> entry :
+        heartbeats.entrySet()) {
+      final int fromId = entry.getKey().getLeft();
+      final int toId = entry.getKey().getRight();
+      if (!entry.getValue().isEmpty() && 
!failureDetector.isAvailable(entry.getValue())) {
+        LOGGER.info("Connection from DataNode {} to DataNode {} is broken", 
fromId, toId);

Review Comment:
   may cause too many logs constantly



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
+  private final Integer myself;
+  private volatile Map<Integer, TDataNodeLocation> dataNodes;
+  private volatile Map<Integer, Set<Integer>> topologyMap;
+  private volatile boolean isPartitioned;
+
+  public static ClusterTopology getInstance() {
+    return ClusterTopologyHolder.INSTANCE;
+  }
+
+  public TRegionReplicaSet getReachableSet(TRegionReplicaSet origin) {
+    if (!isPartitioned || origin == null) {
+      return origin;
+    }
+    final Set<Integer> reachableToMyself = 
Collections.unmodifiableSet(topologyMap.get(myself));
+    final List<TDataNodeLocation> locations = new ArrayList<>();
+    for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
+      if (reachableToMyself.contains(location.getDataNodeId())) {
+        locations.add(location);
+      }
+    }
+    return new TRegionReplicaSet(origin.getRegionId(), locations);
+  }
+
+  public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(
+      Set<Map.Entry<TRegionReplicaSet, T>> input) {
+    final List<TRegionReplicaSet> allSets =
+        input.stream().map(Map.Entry::getKey).collect(Collectors.toList());
+    final List<TRegionReplicaSet> candidates = getReachableCandidates(allSets);
+    final Map<TConsensusGroupId, TRegionReplicaSet> newSet = new HashMap<>();
+    candidates.forEach(set -> newSet.put(set.getRegionId(), set));
+    final Map<TRegionReplicaSet, T> candidateMap = new HashMap<>();
+    for (final Map.Entry<TRegionReplicaSet, T> entry : input) {
+      final TConsensusGroupId gid = entry.getKey().getRegionId();
+      if (newSet.containsKey(gid)) {
+        candidateMap.put(newSet.get(gid), entry.getValue());
+      }
+    }
+    return candidateMap.entrySet();
+  }
+
+  public List<TRegionReplicaSet> 
getReachableCandidates(List<TRegionReplicaSet> all) {
+    if (!isPartitioned || all == null || all.isEmpty()) {
+      return all;
+    }
+    final Map<Integer, Set<Integer>> topologyMapCurrent =
+        Collections.unmodifiableMap(this.topologyMap);
+
+    // brute-force search to select DataNode candidates that can communicate 
to all
+    // TRegionReplicaSets
+    final List<Integer> dataNodeCandidates = new ArrayList<>();
+    for (final Integer datanode : topologyMapCurrent.keySet()) {
+      boolean reachableToAllSets = true;
+      final Set<Integer> datanodeReachableToThis = 
topologyMapCurrent.get(datanode);
+      for (final TRegionReplicaSet replicaSet : all) {
+        final List<Integer> replicaNodeLocations =
+            replicaSet.getDataNodeLocations().stream()
+                .map(TDataNodeLocation::getDataNodeId)
+                .collect(Collectors.toList());
+        replicaNodeLocations.retainAll(datanodeReachableToThis);
+        reachableToAllSets = !replicaNodeLocations.isEmpty();
+      }
+      if (reachableToAllSets) {
+        dataNodeCandidates.add(datanode);
+      }
+    }
+
+    // select TRegionReplicaSet candidates whose DataNode Locations contain at 
least one
+    // allReachableDataNodes
+    final List<TRegionReplicaSet> reachableSetCandidates = new ArrayList<>();
+    for (final TRegionReplicaSet replicaSet : all) {
+      final List<Integer> commonLocations =
+          replicaSet.getDataNodeLocations().stream()
+              .map(TDataNodeLocation::getDataNodeId)
+              .collect(Collectors.toList());
+      commonLocations.retainAll(dataNodeCandidates);
+      if (!commonLocations.isEmpty()) {
+        final List<TDataNodeLocation> validLocations =
+            
commonLocations.stream().map(dataNodes::get).collect(Collectors.toList());
+        final TRegionReplicaSet validCandidate =
+            new TRegionReplicaSet(replicaSet.getRegionId(), validLocations);
+        reachableSetCandidates.add(validCandidate);
+      }
+    }
+
+    return reachableSetCandidates;
+  }
+
+  public void updateTopology(
+      final Map<Integer, TDataNodeLocation> dataNodes, Map<Integer, 
Set<Integer>> latestTopology) {
+    this.dataNodes = dataNodes;
+    this.topologyMap = latestTopology;
+    if (latestTopology.get(myself) == null) {
+      // latest topology doesn't include this node information.
+      // This mostly happens when this node just starts and haven't report 
connection details.
+      this.isPartitioned = false;
+    } else {
+      this.isPartitioned = latestTopology.get(myself).size() != 
latestTopology.keySet().size();
+    }
+    LOGGER.info("[Topology] latest view from config-node: {}", latestTopology);

Review Comment:
   cause too many logs?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = 1_000L;
+  private final ScheduledExecutorService topologyExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+  private final IManager configManager;
+  private ScheduledFuture<?> future;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                60,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public void startTopologyService() {
+    future =

Review Comment:
   use synchronized and please judge null
   <img width="785" alt="image" 
src="https://github.com/user-attachments/assets/7f64d936-feb1-4c7a-8b0d-de7dac19e080";
 />
   



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = 1_000L;
+  private final ScheduledExecutorService topologyExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+  private final IManager configManager;
+  private ScheduledFuture<?> future;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                60,

Review Comment:
   make it a static field to reuse with AbstractLoadCache



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = 1_000L;
+  private final ScheduledExecutorService topologyExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+  private final IManager configManager;
+  private ScheduledFuture<?> future;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;

Review Comment:
   Pay attention to the lifetime management of nodes. For example, you should 
delete some field when removing some nodes



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.exceptions;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+public class ReplicaSetUnreachableException extends RuntimeException {
+  public ReplicaSetUnreachableException(TRegionReplicaSet replicaSet) {
+    super(replicaSet.toString());

Review Comment:
   Please add some description message



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = 1_000L;
+  private final ScheduledExecutorService topologyExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+  private final IManager configManager;
+  private ScheduledFuture<?> future;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                60,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public void startTopologyService() {
+    future =
+        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+            topologyExecutor, this::topologyProbing, 0, PROBING_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public void stopTopologyService() {
+    future.cancel(true);
+    future = null;
+    LOGGER.info("Topology Probing has stopped successfully");
+  }
+
+  private void topologyProbing() {
+    // 1. get the latest datanode list
+    final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    final Set<Integer> dataNodeIds = new HashSet<>();
+    final Map<TEndPoint, Integer> endPoint2IdMap = new HashMap<>();
+    for (final TDataNodeConfiguration dataNodeConf :
+        configManager.getNodeManager().getRegisteredDataNodes()) {
+      final TDataNodeLocation location = dataNodeConf.getLocation();
+      dataNodeLocations.add(location);
+      dataNodeIds.add(location.getDataNodeId());
+      endPoint2IdMap.put(location.getInternalEndPoint(), 
location.getDataNodeId());
+    }
+
+    // 2. send the verify connection RPC to all datanode
+    final TNodeLocations nodeLocations = new TNodeLocations();
+    nodeLocations.setDataNodeLocations(dataNodeLocations);
+    nodeLocations.setConfigNodeLocations(Collections.emptyList());
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodes().stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, 
location -> location));
+    final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
+        dataNodeAsyncRequestContext =
+            new DataNodeAsyncRequestContext<>(
+                CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
+                nodeLocations,
+                dataNodeLocationMap);
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);

Review Comment:
   too small? It feels like a little bit of GC and it's going to timeout?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = 1_000L;
+  private final ScheduledExecutorService topologyExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+  private final IManager configManager;
+  private ScheduledFuture<?> future;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                60,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public void startTopologyService() {
+    future =
+        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+            topologyExecutor, this::topologyProbing, 0, PROBING_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public void stopTopologyService() {
+    future.cancel(true);
+    future = null;
+    LOGGER.info("Topology Probing has stopped successfully");
+  }
+
+  private void topologyProbing() {
+    // 1. get the latest datanode list
+    final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    final Set<Integer> dataNodeIds = new HashSet<>();
+    final Map<TEndPoint, Integer> endPoint2IdMap = new HashMap<>();
+    for (final TDataNodeConfiguration dataNodeConf :
+        configManager.getNodeManager().getRegisteredDataNodes()) {
+      final TDataNodeLocation location = dataNodeConf.getLocation();
+      dataNodeLocations.add(location);
+      dataNodeIds.add(location.getDataNodeId());
+      endPoint2IdMap.put(location.getInternalEndPoint(), 
location.getDataNodeId());
+    }
+
+    // 2. send the verify connection RPC to all datanode
+    final TNodeLocations nodeLocations = new TNodeLocations();
+    nodeLocations.setDataNodeLocations(dataNodeLocations);
+    nodeLocations.setConfigNodeLocations(Collections.emptyList());
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodes().stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, 
location -> location));
+    final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
+        dataNodeAsyncRequestContext =
+            new DataNodeAsyncRequestContext<>(
+                CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
+                nodeLocations,
+                dataNodeLocationMap);
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);
+    final List<TTestConnectionResult> results = new ArrayList<>();
+    dataNodeAsyncRequestContext
+        .getResponseMap()
+        .forEach(
+            (nodeId, resp) -> {
+              if (resp.isSetResultList()) {
+                results.addAll(resp.getResultList());
+              }
+            });
+
+    // 3. collect results and update the heartbeat timestamps
+    for (final TTestConnectionResult result : results) {
+      final int fromDataNodeId =
+          Optional.ofNullable(result.getSender().getDataNodeLocation())
+              .map(TDataNodeLocation::getDataNodeId)
+              .orElse(-1);
+      final int toDataNodeId =
+          
Optional.ofNullable(endPoint2IdMap.get(result.getServiceProvider().getEndPoint()))
+              .orElse(-1);
+      if (result.isSuccess()
+          && dataNodeIds.contains(fromDataNodeId)
+          && dataNodeIds.contains(toDataNodeId)) {
+        // testAllDataNodeConnectionWithTimeout ensures the heartbeats are 
Dn-Dn internally. Here we
+        // just double-check.
+        heartbeats
+            .computeIfAbsent(new Pair<>(fromDataNodeId, toDataNodeId), p -> 
new ArrayList<>())

Review Comment:
   Infinite growth?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = 1_000L;
+  private final ScheduledExecutorService topologyExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+  private final IManager configManager;
+  private ScheduledFuture<?> future;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                60,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public void startTopologyService() {
+    future =
+        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+            topologyExecutor, this::topologyProbing, 0, PROBING_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public void stopTopologyService() {
+    future.cancel(true);
+    future = null;
+    LOGGER.info("Topology Probing has stopped successfully");
+  }
+
+  private void topologyProbing() {
+    // 1. get the latest datanode list
+    final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    final Set<Integer> dataNodeIds = new HashSet<>();
+    final Map<TEndPoint, Integer> endPoint2IdMap = new HashMap<>();

Review Comment:
   How about just adding datanodeid to TestDNConnectionRPC?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.exceptions;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.Collection;
+
+public class RootFIPlacementException extends RuntimeException {
+  public RootFIPlacementException(Collection<TRegionReplicaSet> replicaSets) {
+    super(replicaSets.toString());

Review Comment:
   Please add some description message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to