Copilot commented on code in PR #17595:
URL: https://github.com/apache/iotdb/pull/17595#discussion_r3200561952
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -224,6 +257,14 @@ private synchronized void topologyProbing() {
final int fromId = entry.getKey().getLeft();
final int toId = entry.getKey().getRight();
+ if (!proberIds.contains(fromId)) {
+ // Not in this batch — carry forward as reachable if history is
non-empty and last known OK
+ if (!entry.getValue().isEmpty()) {
+ Optional.ofNullable(latestTopology.get(fromId)).ifPresent(s ->
s.add(toId));
+ }
+ continue;
Review Comment:
For non-prober (fromId) pairs, the code currently marks (fromId -> toId) as
reachable whenever there is any heartbeat history. This loses the previous
failure-detector outcome and can incorrectly flip a previously-detected broken
connection back to reachable just because old samples remain in the list.
Persist the last computed reachability per pair (or reuse the last topology)
and carry that forward for non-probers, rather than using
`!entry.getValue().isEmpty()` as the condition.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -234,21 +275,68 @@ private synchronized void topologyProbing() {
logAsymmetricPartition(latestTopology);
- // 5. notify the listeners on topology change
+ // 9. notify the listeners on topology change
if (shouldRun.get()) {
topologyChangeListener.accept(latestTopology);
}
+
+ // 10. push topology changes to DataNodes
+ pushTopologyToDataNodes(latestTopology, dataNodeLocations);
}
/**
- * We only consider warning (one vs remaining) network partition. If we need
to cover more
- * complicated scenarios like (many vs many) network partition, we shall use
graph algorithms
- * then.
+ * Push topology changes to DataNodes via PUSH_TOPOLOGY request. Each
DataNode receives only its
+ * own reachable set. lastPushedTopology is updated only on successful push.
*/
+ private void pushTopologyToDataNodes(
+ Map<Integer, Set<Integer>> latestTopology, List<TDataNodeLocation>
dataNodeLocations) {
+ final Map<Integer, TDataNodeLocation> dataNodesMap =
+ dataNodeLocations.stream()
+ .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc ->
loc));
+
+ final Map<Integer, TDataNodeLocation> targetMap = new HashMap<>();
+ for (final TDataNodeLocation location : dataNodeLocations) {
+ final int nodeId = location.getDataNodeId();
+ final Set<Integer> reachableSet = latestTopology.getOrDefault(nodeId,
Collections.emptySet());
+ final Set<Integer> lastPushed = lastPushedTopology.get(nodeId);
+ if (lastPushed != null && lastPushed.equals(reachableSet)) {
+ continue;
+ }
+ targetMap.put(nodeId, location);
+ }
+
+ if (targetMap.isEmpty()) {
+ return;
+ }
+
+ final DataNodeAsyncRequestContext<TUpdateClusterTopologyReq, TSStatus>
context =
+ new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.PUSH_TOPOLOGY, targetMap);
+ for (final Map.Entry<Integer, TDataNodeLocation> entry :
targetMap.entrySet()) {
+ final int nodeId = entry.getKey();
+ final Set<Integer> reachableSet = latestTopology.getOrDefault(nodeId,
Collections.emptySet());
+ final Map<Integer, Set<Integer>> perNodeTopology = new HashMap<>();
+ perNodeTopology.put(nodeId, new HashSet<>(reachableSet));
+ final TUpdateClusterTopologyReq req =
+ new TUpdateClusterTopologyReq(dataNodesMap, perNodeTopology);
+ context.putRequest(nodeId, req);
+ }
+
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequestWithTimeoutInMs(context,
CONF.getTopologyProbingBaseIntervalInMs());
+
+ context
+ .getResponseMap()
+ .forEach(
+ (nodeId, resp) -> {
+ Set<Integer> reachableSet =
+ latestTopology.getOrDefault(nodeId, Collections.emptySet());
+ lastPushedTopology.put(nodeId, new HashSet<>(reachableSet));
Review Comment:
`lastPushedTopology` is updated for every response in
`context.getResponseMap()` without checking whether the TSStatus indicates
success. Because the handler populates the response map even on RPC
errors/non-success codes, this can record a failed push as if it succeeded and
prevent future retries to that DataNode. Only update `lastPushedTopology` when
`resp.getCode()` is SUCCESS_STATUS (and ideally keep/clear it on failures so
the next probing cycle retries).
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java:
##########
@@ -2076,9 +2087,23 @@ public TTestConnectionResp
submitTestConnectionTask(final TNodeLocations nodeLoc
@Override
public TTestConnectionResp submitInternalTestConnectionTask(TNodeLocations
nodeLocations)
throws TException {
- return new TTestConnectionResp(
- new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
-
testAllDataNodeConnectionInHeartbeatChannel(nodeLocations.getDataNodeLocations()));
+ try {
+ Future<TTestConnectionResp> future =
+ TOPOLOGY_PROBING_EXECUTOR.submit(
+ () ->
+ new TTestConnectionResp(
+ new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+ testAllDataNodeConnectionInHeartbeatChannel(
+ nodeLocations.getDataNodeLocations())));
+ return future.get(TEST_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ return new TTestConnectionResp(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage("Topology probing timed out after " +
TEST_CONNECTION_TIMEOUT_MS + "ms"),
+ Collections.emptyList());
Review Comment:
On `TimeoutException`, the submitted probing task is left running in
`TOPOLOGY_PROBING_EXECUTOR` (the `Future` is not cancelled). This can
accumulate stuck tasks and tie up the limited executor threads even though the
RPC already returned a timeout to the caller. Cancel the future (e.g.,
`future.cancel(true)`) in the timeout path, and consider handling
`InterruptedException` separately to preserve the interrupt flag.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java:
##########
@@ -153,36 +154,70 @@ public void run() {
}
}
+ /**
+ * Select sqrt(N) DataNodes as probers, rotating through all DataNodes
across cycles so that every
+ * DataNode gets to be a prober over sqrt(N) cycles.
+ */
+ private List<TDataNodeLocation> selectProbers(List<TDataNodeLocation>
allDataNodes) {
+ int n = allDataNodes.size();
+ if (n <= 1) {
+ return allDataNodes;
+ }
+ int sqrtN = (int) Math.ceil(Math.sqrt(n));
+ List<TDataNodeLocation> sorted = new ArrayList<>(allDataNodes);
+ sorted.sort(Comparator.comparingInt(TDataNodeLocation::getDataNodeId));
+ int startIndex = (proberRotationIndex * sqrtN) % n;
+ proberRotationIndex++;
+ List<TDataNodeLocation> probers = new ArrayList<>(sqrtN);
+ for (int i = 0; i < sqrtN && i < n; i++) {
+ probers.add(sorted.get((startIndex + i) % n));
+ }
+ return probers;
+ }
+
private synchronized void topologyProbing() {
- // 1. get the latest datanode list
+ // 1. get Running DataNodes only
final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
final Set<Integer> dataNodeIds = new HashSet<>();
for (final TDataNodeConfiguration dataNodeConf :
configManager.getNodeManager().getRegisteredDataNodes()) {
final TDataNodeLocation location = dataNodeConf.getLocation();
- if (startingDataNodes.contains(location.getDataNodeId())) {
- continue; // we shall wait for internal endpoint to be ready
+ if
(configManager.getLoadManager().getNodeStatus(location.getDataNodeId())
+ != NodeStatus.Running) {
+ continue;
}
dataNodeLocations.add(location);
dataNodeIds.add(location.getDataNodeId());
}
- // 2. send the verify connection RPC to all datanodes
+ // 2. compute probing timeout
+ final long timeout =
+ (long) (CONF.getTopologyProbingBaseIntervalInMs() *
CONF.getTopologyProbingTimeoutRatio());
+
Review Comment:
`timeout` is computed directly from `topology_probing_timeout_ratio` without
validating its range. If the ratio is <= 0 the timeout becomes 0 (immediate
timeouts), and if it is >= 1 the probing timeout can exceed/equals the interval
(defeating the purpose of bounding). Consider validating/clamping the ratio
(e.g., (0,1)) when loading config or before computing the timeout, and ensure a
minimum positive timeout.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java:
##########
@@ -1778,6 +1780,18 @@ public TSStatus setConfiguration(TSetConfigurationReq
req) {
return RpcUtils.squashResponseStatusList(statusList);
}
+ private void handleTopologyProbingHotReload(boolean wasEnabled) {
+ boolean isEnabled = CONF.isEnableTopologyProbing();
+ if (wasEnabled == isEnabled) {
+ return;
+ }
+ if (isEnabled && getConsensusManager().isLeader()) {
+ getLoadManager().startTopologyService();
+ } else if (!isEnabled) {
+ getLoadManager().stopTopologyService();
+ }
Review Comment:
PR description says the probing thread "stays alive when disabled and
resumes on re-enable", but hot-reload currently calls `stopTopologyService()`
which cancels the Future and clears heartbeat history (and
`startTopologyService()` later submits a new task). Either adjust the
implementation to pause/resume without cancelling/clearing state, or update the
PR description/behavior so the hot-reload semantics are consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]