Pengzna commented on code in PR #14910:
URL: https://github.com/apache/iotdb/pull/14910#discussion_r1971128381
##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java:
##########
@@ -246,46 +276,107 @@ private void balanceRegionLeader(
getLoadManager().forceUpdateConsensusGroupCache(successTransferMap);
- invalidateSchemaCacheOfOldLeaders(currentLeaderMap,
successTransferMap.keySet());
+ // Prepare data for invalidSchemaCacheOfOldLeaders
+ switch (regionGroupType) {
+ case SchemaRegion:
+ lastBalancedSchemaRegionSet = successTransferMap.keySet();
+ lastSchemaRegion2OldLeaderMap = currentLeaderMap;
+ break;
+ case DataRegion:
+ lastBalancedDataRegionSet = successTransferMap.keySet();
+ lastDataRegion2OldLeaderMap = currentLeaderMap;
+ break;
+ }
}
- private void invalidateSchemaCacheOfOldLeaders(
- final Map<TConsensusGroupId, Integer> oldLeaderMap,
- final Set<TConsensusGroupId> successTransferSet) {
- final DataNodeAsyncRequestContext<String, TSStatus>
invalidateSchemaCacheRequestHandler =
- new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.INVALIDATE_LAST_CACHE);
- final AtomicInteger requestIndex = new AtomicInteger(0);
- oldLeaderMap.entrySet().stream()
- .filter(entry -> TConsensusGroupType.DataRegion ==
entry.getKey().getType())
- .filter(entry -> successTransferSet.contains(entry.getKey()))
- .forEach(
- entry -> {
- // set target
- final Integer dataNodeId = entry.getValue();
- if (dataNodeId == -1) {
- return;
- }
- final TDataNodeLocation dataNodeLocation =
-
getNodeManager().getRegisteredDataNode(dataNodeId).getLocation();
- if (dataNodeLocation == null) {
- LOGGER.warn("DataNodeLocation is null, datanodeId {}",
dataNodeId);
- return;
- }
- invalidateSchemaCacheRequestHandler.putNodeLocation(
- requestIndex.get(), dataNodeLocation);
- // set req
- final TConsensusGroupId consensusGroupId = entry.getKey();
- final String database =
getPartitionManager().getRegionDatabase(consensusGroupId);
-
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
- requestIndex.incrementAndGet();
- });
- CnToDnInternalServiceAsyncRequestManager.getInstance()
- .sendAsyncRequest(invalidateSchemaCacheRequestHandler);
+ private void invalidateSchemaCacheOfOldLeaders() {
+ BiConsumer<Map<TConsensusGroupId, Integer>, Set<TConsensusGroupId>>
consumer =
+ (oldLeaderMap, successTransferSet) -> {
+ final DataNodeAsyncRequestContext<String, TSStatus>
invalidateSchemaCacheRequestHandler =
+ new
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.INVALIDATE_LAST_CACHE);
+ final AtomicInteger requestIndex = new AtomicInteger(0);
+ oldLeaderMap.entrySet().stream()
+ .filter(entry -> TConsensusGroupType.DataRegion ==
entry.getKey().getType())
+ .filter(entry -> successTransferSet.contains(entry.getKey()))
+ .forEach(
+ entry -> {
+ // set target
+ final Integer dataNodeId = entry.getValue();
+ if (dataNodeId == -1) {
+ return;
+ }
+ final TDataNodeLocation dataNodeLocation =
+
getNodeManager().getRegisteredDataNode(dataNodeId).getLocation();
+ if (dataNodeLocation == null) {
+ LOGGER.warn("DataNodeLocation is null, datanodeId {}",
dataNodeId);
+ return;
+ }
+ invalidateSchemaCacheRequestHandler.putNodeLocation(
+ requestIndex.get(), dataNodeLocation);
+ // set req
+ final TConsensusGroupId consensusGroupId = entry.getKey();
+ final String database =
+
getPartitionManager().getRegionDatabase(consensusGroupId);
+
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
+ requestIndex.incrementAndGet();
+ });
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequest(invalidateSchemaCacheRequestHandler);
+ };
+
+ if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) {
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]