Copilot commented on code in PR #17821:
URL: https://github.com/apache/iotdb/pull/17821#discussion_r3370485156
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java:
##########
@@ -89,46 +89,55 @@ public void onComplete(TDataNodeHeartbeatResp
heartbeatResp) {
RegionStatus regionStatus =
RegionStatus.valueOf(heartbeatResp.getStatus());
- heartbeatResp
- .getJudgedLeaders()
- .forEach(
- (regionGroupId, isLeader) -> {
-
- // Do not allow regions to inherit the Removing state from
datanode
- RegionStatus nextRegionStatus = regionStatus;
- if (nextRegionStatus == RegionStatus.Removing) {
- nextRegionStatus =
- loadManager
- .getLoadCache()
- .getRegionCacheLastSampleStatus(regionGroupId, nodeId);
- }
-
- // Update RegionGroupCache
- loadManager
- .getLoadCache()
- .cacheRegionHeartbeatSample(
- regionGroupId,
- nodeId,
- new RegionHeartbeatSample(
- heartbeatResp.getHeartbeatTimestamp(),
- // Region will inherit DataNode's status
- nextRegionStatus),
- false);
-
- if
(((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
- && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
- ||
(TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
- && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE))
- && Boolean.TRUE.equals(isLeader)) {
- // Update ConsensusGroupCache when necessary
- loadManager
- .getLoadCache()
- .cacheConsensusSample(
- regionGroupId,
- new ConsensusGroupHeartbeatSample(
-
heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId));
- }
- });
+ Map<TConsensusGroupId, Boolean> judgedLeaders =
+ heartbeatResp.isSetJudgedLeaders()
+ ? heartbeatResp.getJudgedLeaders()
+ : Collections.emptyMap();
+ judgedLeaders.forEach(
+ (regionGroupId, isLeader) -> {
+
+ // Do not allow regions to inherit the Removing state from datanode
+ RegionStatus nextRegionStatus = regionStatus;
+ if (nextRegionStatus == RegionStatus.Removing) {
+ nextRegionStatus =
+
loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId,
nodeId);
+ }
+
+ // Update RegionGroupCache
+ loadManager
+ .getLoadCache()
+ .cacheRegionHeartbeatSample(
+ regionGroupId,
+ nodeId,
+ new RegionHeartbeatSample(
+ heartbeatResp.getHeartbeatTimestamp(),
+ // Region will inherit DataNode's status
+ nextRegionStatus),
+ false);
+
+ boolean shouldCacheConsensusSample =
+ (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
+ && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
+ ||
(TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
+ && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE);
+ long logicalTimestamp =
+ heartbeatResp.isSetConsensusLogicalTimeMap()
+ &&
heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId)
+ ?
heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId)
+ : heartbeatResp.getHeartbeatTimestamp();
+ loadManager
+ .getLoadCache()
+ .cacheConsensusGroupHeartbeatSample(
+ regionGroupId,
+ nodeId,
+ Boolean.TRUE.equals(isLeader),
+ logicalTimestamp,
+ shouldCacheConsensusSample);
Review Comment:
`logicalTimestamp` falls back to `heartbeatResp.getHeartbeatTimestamp()`
when the consensus logical clock is missing for a specific regionGroupId. Since
`ConsensusGroupHeartbeatSample` enforces monotonically increasing
`sampleLogicalTimestamp`, mixing `getLogicalClock()` values (often small
indices) with `System.nanoTime()`-based heartbeat timestamps can permanently
block later valid samples (they will be treated as out-of-order and discarded).
This can happen if `judgedLeaders` and `consensusLogicalTimeMap` are generated
from slightly different snapshots and a newly-added group appears in one map
but not the other.
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java:
##########
@@ -364,6 +373,69 @@ public void cacheConsensusSample(
.ifPresent(group -> group.cacheHeartbeatSample(sample));
}
+ public void cacheConsensusGroupHeartbeatSample(
+ TConsensusGroupId regionGroupId,
+ int nodeId,
+ boolean isLeader,
+ long logicalTimestamp,
+ boolean cacheLeader) {
+ consensusGroupHeartbeatSampledNodeMap
+ .computeIfAbsent(regionGroupId, empty -> ConcurrentHashMap.newKeySet())
+ .add(nodeId);
+ if (cacheLeader && isLeader) {
+ cacheConsensusSample(
+ regionGroupId, new ConsensusGroupHeartbeatSample(logicalTimestamp,
nodeId));
+ } else if (isConsensusGroupHeartbeatFullySampled(regionGroupId)
+ && !Optional.ofNullable(consensusGroupCacheMap.get(regionGroupId))
+ .map(AbstractLoadCache::hasHeartbeatSample)
+ .orElse(false)) {
+ cacheConsensusSample(
+ regionGroupId,
+ new ConsensusGroupHeartbeatSample(
+ logicalTimestamp, ConsensusGroupCache.UN_READY_LEADER_ID));
+ }
Review Comment:
`cacheConsensusGroupHeartbeatSample` may cache an `UN_READY_LEADER_ID`
sample even when `cacheLeader` is false. This makes callers that intentionally
disable consensus caching (or disable it due to missing logical clocks) still
mutate `consensusGroupCacheMap`, and can also reintroduce ordering issues if
the provided `logicalTimestamp` is just a placeholder. Guard the fallback
`UN_READY_LEADER_ID` caching with `cacheLeader` as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]