OneSizeFitsQuorum commented on code in PR #15014:
URL: https://github.com/apache/iotdb/pull/15014#discussion_r2002378386


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java:
##########
@@ -50,7 +50,7 @@ public synchronized void updateCurrentStatistics(boolean 
forceUpdate) {
       if (lastSample == null) {
         /* First heartbeat not received from this region, status is UNKNOWN */
         status = RegionStatus.Unknown;
-      } else if (!failureDetector.isAvailable(history)) {

Review Comment:
   Modify the regionCache constructor to have consensusGroupID and pass it as 
the key. Otherwise, the regioncache key is vulnerable to being used as the key 
in the hashmap, and if the regioncache object is replaced, the semantics will 
not be correct



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+
+import org.apache.ratis.util.AwaitForSignal;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService implements Runnable, IClusterStatusSubscriber {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
+  private static final int SAMPLING_WINDOW_SIZE = 100;
+
+  private final ExecutorService topologyThread =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+
+  private final AwaitForSignal awaitForSignal;
+  private final IManager configManager;
+
+  private final AtomicBoolean shouldRun;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+  private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+    this.shouldRun = new AtomicBoolean(false);
+    this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                IFailureDetector.PHI_COLD_START_THRESHOLD,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public synchronized void startTopologyService() {
+    shouldRun.set(true);
+    topologyThread.submit(this);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public synchronized void stopTopologyService() {
+    shouldRun.set(false);
+    topologyThread.shutdown();

Review Comment:
   Maybe we don't need to shutdown the thread pool, but make sure that the 
thread completes and waits for 60 seconds before being recycled.
   
   Careful design is required to ensure that this single-threaded thread pool 
only has a single task when switching leaders



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
+  private final Integer myself;
+  private volatile Map<Integer, TDataNodeLocation> dataNodes;
+  private volatile Map<Integer, Set<Integer>> topologyMap;
+  private volatile boolean isPartitioned;
+
+  public static ClusterTopology getInstance() {
+    return ClusterTopologyHolder.INSTANCE;
+  }
+
+  public TRegionReplicaSet getReachableSet(TRegionReplicaSet origin) {
+    if (!isPartitioned || origin == null) {
+      return origin;
+    }
+    final Set<Integer> reachableToMyself = 
Collections.unmodifiableSet(topologyMap.get(myself));
+    final List<TDataNodeLocation> locations = new ArrayList<>();
+    for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
+      if (reachableToMyself.contains(location.getDataNodeId())) {
+        locations.add(location);
+      }
+    }
+    return new TRegionReplicaSet(origin.getRegionId(), locations);
+  }
+
+  public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(
+      Set<Map.Entry<TRegionReplicaSet, T>> input) {
+    final List<TRegionReplicaSet> allSets =
+        input.stream().map(Map.Entry::getKey).collect(Collectors.toList());
+    final List<TRegionReplicaSet> candidates = getReachableCandidates(allSets);
+    final Map<TConsensusGroupId, TRegionReplicaSet> newSet = new HashMap<>();

Review Comment:
   newMap?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+
+import org.apache.ratis.util.AwaitForSignal;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService implements Runnable, IClusterStatusSubscriber {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
+  private static final int SAMPLING_WINDOW_SIZE = 100;
+
+  private final ExecutorService topologyThread =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+
+  private final AwaitForSignal awaitForSignal;
+  private final IManager configManager;
+
+  private final AtomicBoolean shouldRun;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+  private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+    this.shouldRun = new AtomicBoolean(false);
+    this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                IFailureDetector.PHI_COLD_START_THRESHOLD,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public synchronized void startTopologyService() {
+    shouldRun.set(true);
+    topologyThread.submit(this);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public synchronized void stopTopologyService() {
+    shouldRun.set(false);
+    topologyThread.shutdown();
+    try {
+      topologyThread.awaitTermination(PROBING_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    LOGGER.info("Topology Probing has stopped successfully");
+  }
+
+  /**
+   * Schedule the {@link #topologyProbing} task either: 1. every 
PROBING_INTERVAL_MS interval. 2.
+   * Manually triggered by outside events (node restart / register, etc.).
+   */
+  private void mayWait() {
+    try {
+      this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void run() {
+    for (; shouldRun.get(); mayWait()) {
+      topologyProbing();
+    }
+  }
+
+  private void topologyProbing() {
+    // 1. get the latest datanode list
+    final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    final Set<Integer> dataNodeIds = new HashSet<>();
+    for (final TDataNodeConfiguration dataNodeConf :
+        configManager.getNodeManager().getRegisteredDataNodes()) {
+      final TDataNodeLocation location = dataNodeConf.getLocation();
+      if (startingDataNodes.contains(location.getDataNodeId())) {
+        continue; // we shall wait for internal endpoint to be ready
+      }
+      dataNodeLocations.add(location);
+      dataNodeIds.add(location.getDataNodeId());
+    }
+
+    // 2. send the verify connection RPC to all datanode
+    final TNodeLocations nodeLocations = new TNodeLocations();
+    nodeLocations.setDataNodeLocations(dataNodeLocations);
+    nodeLocations.setConfigNodeLocations(Collections.emptyList());
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodes().stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, 
location -> location));
+    final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
+        dataNodeAsyncRequestContext =
+            new DataNodeAsyncRequestContext<>(
+                CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
+                nodeLocations,
+                dataNodeLocationMap);
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);
+    final List<TTestConnectionResult> results = new ArrayList<>();
+    dataNodeAsyncRequestContext
+        .getResponseMap()
+        .forEach(
+            (nodeId, resp) -> {
+              if (resp.isSetResultList()) {
+                results.addAll(resp.getResultList());
+              }
+            });
+
+    // 3. collect results and update the heartbeat timestamps
+    for (final TTestConnectionResult result : results) {
+      final int fromDataNodeId =
+          Optional.ofNullable(result.getSender().getDataNodeLocation())
+              .map(TDataNodeLocation::getDataNodeId)
+              .orElse(-1);
+      final int toDataNodeId = result.getServiceProvider().getNodeId();
+      if (result.isSuccess()
+          && dataNodeIds.contains(fromDataNodeId)
+          && dataNodeIds.contains(toDataNodeId)) {
+        // testAllDataNodeConnectionWithTimeout ensures the heartbeats are 
Dn-Dn internally. Here we
+        // just double-check.
+        final List<AbstractHeartbeatSample> heartbeatHistory =
+            heartbeats.computeIfAbsent(
+                new Pair<>(fromDataNodeId, toDataNodeId), p -> new 
LinkedList<>());
+        heartbeatHistory.add(new NodeHeartbeatSample(NodeStatus.Running));
+        if (heartbeatHistory.size() > SAMPLING_WINDOW_SIZE) {
+          heartbeatHistory.remove(0);
+        }
+      }
+    }
+
+    // 4. use failure detector to identify potential network partitions
+    final Map<Integer, Set<Integer>> latestTopology =
+        dataNodeLocations.stream()
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> 
new HashSet<>()));
+    for (final Map.Entry<Pair<Integer, Integer>, 
List<AbstractHeartbeatSample>> entry :
+        heartbeats.entrySet()) {
+      final int fromId = entry.getKey().getLeft();
+      final int toId = entry.getKey().getRight();
+      if (!entry.getValue().isEmpty()
+          && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) {
+        LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", 
fromId, toId);
+      } else {
+        latestTopology.get(fromId).add(toId);
+      }
+    }
+
+    // 5. notify the listeners on topology change
+    topologyChangeListener.accept(latestTopology);
+  }
+
+  /** We only listen to datanode remove / restart / register events */
+  @Override
+  public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
+    final Set<Integer> datanodeIds =
+        
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
+    final Map<Integer, Pair<NodeStatistics, NodeStatistics>> changes =
+        event.getDifferentNodeStatisticsMap();
+    for (final Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> entry :
+        changes.entrySet()) {
+      final Integer nodeId = entry.getKey();
+      final Pair<NodeStatistics, NodeStatistics> changeEvent = 
entry.getValue();
+      if (!datanodeIds.contains(nodeId)) {
+        continue;
+      }
+      if (changeEvent.getLeft() == null) {
+        // if a new datanode registered, DO NOT trigger probing immediately
+        startingDataNodes.add(nodeId);
+        continue;
+      } else {
+        startingDataNodes.remove(nodeId);
+      }
+
+      if (changeEvent.getRight() == null) {
+        // datanode removed from cluster, clean up probing history
+        final Set<Pair<Integer, Integer>> toRemove =
+            heartbeats.keySet().stream()
+                .filter(
+                    pair ->
+                        Objects.equals(pair.getLeft(), nodeId)
+                            || Objects.equals(pair.getRight(), nodeId))
+                .collect(Collectors.toSet());
+        toRemove.forEach(heartbeats::remove);

Review Comment:
   Need concurrency safety?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+
+import org.apache.ratis.util.AwaitForSignal;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService implements Runnable, IClusterStatusSubscriber {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
+  private static final int SAMPLING_WINDOW_SIZE = 100;
+
+  private final ExecutorService topologyThread =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+
+  private final AwaitForSignal awaitForSignal;
+  private final IManager configManager;
+
+  private final AtomicBoolean shouldRun;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+  private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+    this.shouldRun = new AtomicBoolean(false);
+    this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                IFailureDetector.PHI_COLD_START_THRESHOLD,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public synchronized void startTopologyService() {
+    shouldRun.set(true);
+    topologyThread.submit(this);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public synchronized void stopTopologyService() {
+    shouldRun.set(false);
+    topologyThread.shutdown();
+    try {
+      topologyThread.awaitTermination(PROBING_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    LOGGER.info("Topology Probing has stopped successfully");
+  }
+
+  /**
+   * Schedule the {@link #topologyProbing} task either: 1. every 
PROBING_INTERVAL_MS interval. 2.
+   * Manually triggered by outside events (node restart / register, etc.).
+   */
+  private void mayWait() {
+    try {
+      this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void run() {
+    for (; shouldRun.get(); mayWait()) {
+      topologyProbing();
+    }
+  }
+
+  private void topologyProbing() {
+    // 1. get the latest datanode list
+    final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    final Set<Integer> dataNodeIds = new HashSet<>();
+    for (final TDataNodeConfiguration dataNodeConf :
+        configManager.getNodeManager().getRegisteredDataNodes()) {
+      final TDataNodeLocation location = dataNodeConf.getLocation();
+      if (startingDataNodes.contains(location.getDataNodeId())) {
+        continue; // we shall wait for internal endpoint to be ready
+      }
+      dataNodeLocations.add(location);
+      dataNodeIds.add(location.getDataNodeId());
+    }
+
+    // 2. send the verify connection RPC to all datanode

Review Comment:
   all datanodes



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java:
##########
@@ -207,7 +208,6 @@ public GenericKeyedObjectPool<TEndPoint, 
AsyncDataNodeInternalServiceClient> cre
                       
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
                       
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                       
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                      .setPrintLogWhenEncounterException(false)

Review Comment:
   do not remove this?



##########
integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java:
##########
@@ -1706,6 +1709,11 @@ public static void restartDataNodes() {
     long retryIntervalMS = 1000;
     while (true) {
       try (Connection connection = EnvFactory.getEnv().getConnection()) {
+        final List<BaseNodeWrapper> allDataNodes =

Review Comment:
   We can remove this logic after adding the retry PR next time to reflect our 
RTO/RPO optimization



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/AsyncTSStatusRPCHandler.java:
##########
@@ -53,7 +53,7 @@ public void onComplete(TSStatus response) {
     if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Remove only if success
       nodeLocationMap.remove(requestId);
-      LOGGER.info("Successfully {} on DataNode: {}", requestType, 
formattedTargetLocation);
+      LOGGER.debug("Successfully {} on DataNode: {}", requestType, 
formattedTargetLocation);

Review Comment:
   Do we need to set up special clients and handlers for testconnection tasks 
to ensure that we don't keep logging down nodes, whether they succeed or fail? 
At the moment, it looks like the outage node will be logging continuously, 
maybe a new parameter can be added



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
+  private final Integer myself;
+  private volatile Map<Integer, TDataNodeLocation> dataNodes;
+  private volatile Map<Integer, Set<Integer>> topologyMap;

Review Comment:
   atomicreference?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+
+import org.apache.ratis.util.AwaitForSignal;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService implements Runnable, IClusterStatusSubscriber {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
+  private static final int SAMPLING_WINDOW_SIZE = 100;
+
+  private final ExecutorService topologyThread =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+
+  private final AwaitForSignal awaitForSignal;
+  private final IManager configManager;
+
+  private final AtomicBoolean shouldRun;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+  private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+    this.shouldRun = new AtomicBoolean(false);
+    this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                IFailureDetector.PHI_COLD_START_THRESHOLD,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public synchronized void startTopologyService() {
+    shouldRun.set(true);
+    topologyThread.submit(this);

Review Comment:
   The startTopologyService and stopTopologyService functions may be called 
repeatedly when the leader cuts. It looks like you have an error at start after 
shutdown?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
+  private final Integer myself;
+  private volatile Map<Integer, TDataNodeLocation> dataNodes;
+  private volatile Map<Integer, Set<Integer>> topologyMap;
+  private volatile boolean isPartitioned;
+
+  public static ClusterTopology getInstance() {
+    return ClusterTopologyHolder.INSTANCE;
+  }
+
+  public TRegionReplicaSet getReachableSet(TRegionReplicaSet origin) {
+    if (!isPartitioned || origin == null) {
+      return origin;
+    }
+    final Set<Integer> reachableToMyself = 
Collections.unmodifiableSet(topologyMap.get(myself));
+    final List<TDataNodeLocation> locations = new ArrayList<>();
+    for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
+      if (reachableToMyself.contains(location.getDataNodeId())) {
+        locations.add(location);
+      }
+    }
+    return new TRegionReplicaSet(origin.getRegionId(), locations);
+  }
+
+  public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(
+      Set<Map.Entry<TRegionReplicaSet, T>> input) {
+    final List<TRegionReplicaSet> allSets =
+        input.stream().map(Map.Entry::getKey).collect(Collectors.toList());
+    final List<TRegionReplicaSet> candidates = getReachableCandidates(allSets);
+    final Map<TConsensusGroupId, TRegionReplicaSet> newSet = new HashMap<>();
+    candidates.forEach(set -> newSet.put(set.getRegionId(), set));
+    final Map<TRegionReplicaSet, T> candidateMap = new HashMap<>();
+    for (final Map.Entry<TRegionReplicaSet, T> entry : input) {
+      final TConsensusGroupId gid = entry.getKey().getRegionId();
+      if (newSet.containsKey(gid)) {
+        candidateMap.put(newSet.get(gid), entry.getValue());
+      }
+    }
+    return candidateMap.entrySet();
+  }
+
+  public List<TRegionReplicaSet> 
getReachableCandidates(List<TRegionReplicaSet> all) {
+    if (!isPartitioned || all == null || all.isEmpty()) {
+      return all;
+    }
+    final Map<Integer, Set<Integer>> topologyMapCurrent =
+        Collections.unmodifiableMap(this.topologyMap);
+
+    // brute-force search to select DataNode candidates that can communicate 
to all
+    // TRegionReplicaSets
+    final List<Integer> dataNodeCandidates = new ArrayList<>();
+    for (final Integer datanode : topologyMapCurrent.keySet()) {
+      boolean reachableToAllSets = true;
+      final Set<Integer> datanodeReachableToThis = 
topologyMapCurrent.get(datanode);
+      for (final TRegionReplicaSet replicaSet : all) {
+        final List<Integer> replicaNodeLocations =
+            replicaSet.getDataNodeLocations().stream()
+                .map(TDataNodeLocation::getDataNodeId)
+                .collect(Collectors.toList());
+        replicaNodeLocations.retainAll(datanodeReachableToThis);
+        reachableToAllSets = !replicaNodeLocations.isEmpty();
+      }
+      if (reachableToAllSets) {
+        dataNodeCandidates.add(datanode);
+      }
+    }
+
+    // select TRegionReplicaSet candidates whose DataNode Locations contain at 
least one
+    // allReachableDataNodes
+    final List<TRegionReplicaSet> reachableSetCandidates = new ArrayList<>();
+    for (final TRegionReplicaSet replicaSet : all) {
+      final List<Integer> commonLocations =
+          replicaSet.getDataNodeLocations().stream()
+              .map(TDataNodeLocation::getDataNodeId)
+              .collect(Collectors.toList());
+      commonLocations.retainAll(dataNodeCandidates);
+      if (!commonLocations.isEmpty()) {
+        final List<TDataNodeLocation> validLocations =
+            
commonLocations.stream().map(dataNodes::get).collect(Collectors.toList());
+        final TRegionReplicaSet validCandidate =
+            new TRegionReplicaSet(replicaSet.getRegionId(), validLocations);
+        reachableSetCandidates.add(validCandidate);
+      }
+    }
+
+    return reachableSetCandidates;
+  }
+
+  public void updateTopology(
+      final Map<Integer, TDataNodeLocation> dataNodes, Map<Integer, 
Set<Integer>> latestTopology) {
+    if (!latestTopology.equals(topologyMap)) {
+      LOGGER.info("[Topology] latest view from config-node: {}", 
latestTopology);
+    }
+    this.dataNodes = dataNodes;
+    this.topologyMap = latestTopology;
+    if (latestTopology.get(myself) == null || 
latestTopology.get(myself).isEmpty()) {
+      // latest topology doesn't include this node information.
+      // This mostly happens when this node just starts and haven't report 
connection details.
+      this.isPartitioned = false;
+    } else {
+      this.isPartitioned = latestTopology.get(myself).size() != 
latestTopology.keySet().size();
+    }
+    if (isPartitioned && LOGGER.isDebugEnabled()) {
+      final Set<Integer> allDataLocations = new 
HashSet<>(latestTopology.keySet());
+      allDataLocations.removeAll(latestTopology.get(myself));
+      final String partitioned =
+          allDataLocations.stream()
+              .collect(
+                  StringBuilder::new, (sb, id) -> sb.append(",").append(id), 
StringBuilder::append)
+              .toString();
+      LOGGER.debug("This DataNode {} is partitioned with [{}]", myself, 
partitioned);
+    }
+  }
+
+  private ClusterTopology() {
+    this.myself =
+        
IoTDBDescriptor.getInstance().getConfig().generateLocalDataNodeLocation().getDataNodeId();
+    this.isPartitioned = false;
+    this.topologyMap = null;

Review Comment:
   try to use collections.emptyMap() to init topologyMap and dataNodes



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+
+import org.apache.ratis.util.AwaitForSignal;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService implements Runnable, IClusterStatusSubscriber {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
+  private static final int SAMPLING_WINDOW_SIZE = 100;
+
+  private final ExecutorService topologyThread =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+
+  private final AwaitForSignal awaitForSignal;
+  private final IManager configManager;
+
+  private final AtomicBoolean shouldRun;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+  private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new HashMap<>();
+    this.shouldRun = new AtomicBoolean(false);
+    this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                IFailureDetector.PHI_COLD_START_THRESHOLD,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public synchronized void startTopologyService() {
+    shouldRun.set(true);
+    topologyThread.submit(this);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public synchronized void stopTopologyService() {
+    shouldRun.set(false);
+    topologyThread.shutdown();
+    try {
+      topologyThread.awaitTermination(PROBING_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    LOGGER.info("Topology Probing has stopped successfully");
+  }
+
+  /**
+   * Schedule the {@link #topologyProbing} task either: 1. every 
PROBING_INTERVAL_MS interval. 2.
+   * Manually triggered by outside events (node restart / register, etc.).
+   */
+  private void mayWait() {
+    try {
+      this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void run() {
+    for (; shouldRun.get(); mayWait()) {
+      topologyProbing();
+    }
+  }
+
+  private void topologyProbing() {
+    // 1. get the latest datanode list
+    final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    final Set<Integer> dataNodeIds = new HashSet<>();
+    for (final TDataNodeConfiguration dataNodeConf :
+        configManager.getNodeManager().getRegisteredDataNodes()) {
+      final TDataNodeLocation location = dataNodeConf.getLocation();
+      if (startingDataNodes.contains(location.getDataNodeId())) {
+        continue; // we shall wait for internal endpoint to be ready
+      }
+      dataNodeLocations.add(location);
+      dataNodeIds.add(location.getDataNodeId());
+    }
+
+    // 2. send the verify connection RPC to all datanode
+    final TNodeLocations nodeLocations = new TNodeLocations();
+    nodeLocations.setDataNodeLocations(dataNodeLocations);
+    nodeLocations.setConfigNodeLocations(Collections.emptyList());
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodes().stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, 
location -> location));
+    final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
+        dataNodeAsyncRequestContext =
+            new DataNodeAsyncRequestContext<>(
+                CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
+                nodeLocations,
+                dataNodeLocationMap);
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);
+    final List<TTestConnectionResult> results = new ArrayList<>();
+    dataNodeAsyncRequestContext
+        .getResponseMap()
+        .forEach(
+            (nodeId, resp) -> {
+              if (resp.isSetResultList()) {
+                results.addAll(resp.getResultList());
+              }
+            });
+
+    // 3. collect results and update the heartbeat timestamps
+    for (final TTestConnectionResult result : results) {
+      final int fromDataNodeId =
+          Optional.ofNullable(result.getSender().getDataNodeLocation())
+              .map(TDataNodeLocation::getDataNodeId)
+              .orElse(-1);
+      final int toDataNodeId = result.getServiceProvider().getNodeId();
+      if (result.isSuccess()
+          && dataNodeIds.contains(fromDataNodeId)
+          && dataNodeIds.contains(toDataNodeId)) {
+        // testAllDataNodeConnectionWithTimeout ensures the heartbeats are 
Dn-Dn internally. Here we
+        // just double-check.
+        final List<AbstractHeartbeatSample> heartbeatHistory =
+            heartbeats.computeIfAbsent(
+                new Pair<>(fromDataNodeId, toDataNodeId), p -> new 
LinkedList<>());
+        heartbeatHistory.add(new NodeHeartbeatSample(NodeStatus.Running));
+        if (heartbeatHistory.size() > SAMPLING_WINDOW_SIZE) {
+          heartbeatHistory.remove(0);
+        }
+      }
+    }
+
+    // 4. use failure detector to identify potential network partitions
+    final Map<Integer, Set<Integer>> latestTopology =
+        dataNodeLocations.stream()
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> 
new HashSet<>()));
+    for (final Map.Entry<Pair<Integer, Integer>, 
List<AbstractHeartbeatSample>> entry :
+        heartbeats.entrySet()) {
+      final int fromId = entry.getKey().getLeft();
+      final int toId = entry.getKey().getRight();
+      if (!entry.getValue().isEmpty()
+          && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) {
+        LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", 
fromId, toId);

Review Comment:
   Can we add the judgment that captures symmetric or asymmetric network 
partitions (especially asymmetric network partitions, because symmetric network 
partitions are similar to node crash and now have logs)? Otherwise, it would be 
hard to tell what network failure occurred on the cluster by simply printing 
out these one-to-one logs.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
+  private final Integer myself;
+  private volatile Map<Integer, TDataNodeLocation> dataNodes;

Review Comment:
   atomicreference?



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java:
##########
@@ -131,6 +131,7 @@ public GenericKeyedObjectPool<TEndPoint, 
AsyncDataNodeInternalServiceClient> cre
                       
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
                       
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                       
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                      .setPrintLogWhenEncounterException(false)

Review Comment:
   why we still use add this line for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to