Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2087036696 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. The + * strict replica group routing can also be utilized for OFFLINE tables, thus StrictRealtimeSegmentAssignment + * also needs to be made more generic for the OFFLINE case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssig
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2086941117 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedIn
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2087034298 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,280 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. The + * strict replica group routing can also be utilized for OFFLINE tables, thus StrictRealtimeSegmentAssignment + * also needs to be made more generic for the OFFLINE case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssig
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2087037492 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedIn
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
yashmayya commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2086296902 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,280 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. The + * strict replica group routing can also be utilized for OFFLINE tables, thus StrictRealtimeSegmentAssignment + * also needs to be made more generic for the OFFLINE case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssi
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
yashmayya commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2085993960 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. The + * strict replica group routing can also be utilized for OFFLINE tables, thus StrictRealtimeSegmentAssignment + * also needs to be made more generic for the OFFLINE case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssi
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2085595949 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedIn
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2085026933 ## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java: ## @@ -1438,4 +1464,1011 @@ public void testIsExternalViewConverged() { } } } + + @Test + public void testAssignmentWithServerBatching() { +// Using LLC segment naming so that batching can be tested for both non-strict replica group and strict replica +// group based assignment. Otherwise, the partitionId might have to be fetched from ZK SegmentMetadata which will +// not exist since these aren't actual tables +// Current assignment: +// { +// "segment__1__0__9834786L": { +// "host1": "ONLINE", +// "host2": "ONLINE", +// "host3": "ONLINE" +// }, +// "segment__2__0__9834786L": { +// "host2": "ONLINE", +// "host3": "ONLINE", +// "host4": "ONLINE" +// }, +// "segment__3__0__9834786L": { +// "host1": "ONLINE", +// "host2": "ONLINE", +// "host3": "ONLINE" +// }, +// "segment__4__0__9834786L": { +// "host2": "ONLINE", +// "host3": "ONLINE", +// "host4": "ONLINE" +// } +// } +Map> currentAssignment = new TreeMap<>(); +currentAssignment.put("segment__1__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); +currentAssignment.put("segment__2__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); +currentAssignment.put("segment__3__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); +currentAssignment.put("segment__4__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); + +// Target assignment 1: +// { +// "segment__1__0__9834786L": { +// "host1": "ONLINE", +// "host3": "ONLINE", +// "host5": "ONLINE" +// }, +// "segment__2__0__9834786L": { +// "host2": "ONLINE", +// "host4": "ONLINE", +// "host6": "ONLINE" +// }, +// "segment__3__0__9834786L": { +// "host1": "ONLINE", +// "host3": "ONLINE", +// "host5": "ONLINE" +// }, +// "segment__4__0__9834786L": { +// "host2": "ONLINE", +// "host4": "ONLINE", +// "host6": "ONLINE" +// } +// } +Map> targetAssignment = new TreeMap<>(); +targetAssignment.put("segment__1__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); +targetAssignment.put("segment__2__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); +targetAssignment.put("segment__3__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); +targetAssignment.put("segment__4__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); + +// Number of segments to offload: +// { +// "host1": 0, +// "host2": 2, +// "host3": 2, +// "host4": 0, +// "host5": -2, +// "host6": -2 +// } +Map numSegmentsToOffloadMap = +TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +assertEquals(numSegmentsToOffloadMap.size(), 6); +assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0); +assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2); +assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2); +assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0); +assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2); +assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2); + +// Next assignment with 2 minimum available replicas with or without strict replica-group should reach the target +// assignment after two steps. Batch size = 1, unique partitionIds +for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) { + Object2IntOpenHashMap segmentToPartitionIdMap = new Object2IntOpenHashMap<>(); + Map> nextAssignment = + TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, + 1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER); + assertNotEquals(nextAssignment, targetAssignment); + assertEquals(nextAssignment.get("segment__1__0__9834786L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host3", "host5"))); + assertEquals(nextAssignment.get("segment__2__0__9834786L").keySet(), +
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2084759396 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. The + * strict replica group routing can also be utilized for OFFLINE tables, thus StrictRealtimeSegmentAssignment + * also needs to be made more generic for the OFFLINE case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssig
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2085025521 ## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java: ## @@ -1438,4 +1464,1011 @@ public void testIsExternalViewConverged() { } } } + + @Test + public void testAssignmentWithServerBatching() { +// Using LLC segment naming so that batching can be tested for both non-strict replica group and strict replica +// group based assignment. Otherwise, the partitionId might have to be fetched from ZK SegmentMetadata which will +// not exist since these aren't actual tables +// Current assignment: +// { +// "segment__1__0__9834786L": { +// "host1": "ONLINE", +// "host2": "ONLINE", +// "host3": "ONLINE" +// }, +// "segment__2__0__9834786L": { +// "host2": "ONLINE", +// "host3": "ONLINE", +// "host4": "ONLINE" +// }, +// "segment__3__0__9834786L": { +// "host1": "ONLINE", +// "host2": "ONLINE", +// "host3": "ONLINE" +// }, +// "segment__4__0__9834786L": { +// "host2": "ONLINE", +// "host3": "ONLINE", +// "host4": "ONLINE" +// } +// } +Map> currentAssignment = new TreeMap<>(); +currentAssignment.put("segment__1__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); +currentAssignment.put("segment__2__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); +currentAssignment.put("segment__3__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); +currentAssignment.put("segment__4__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); + +// Target assignment 1: +// { +// "segment__1__0__9834786L": { +// "host1": "ONLINE", +// "host3": "ONLINE", +// "host5": "ONLINE" +// }, +// "segment__2__0__9834786L": { +// "host2": "ONLINE", +// "host4": "ONLINE", +// "host6": "ONLINE" +// }, +// "segment__3__0__9834786L": { +// "host1": "ONLINE", +// "host3": "ONLINE", +// "host5": "ONLINE" +// }, +// "segment__4__0__9834786L": { +// "host2": "ONLINE", +// "host4": "ONLINE", +// "host6": "ONLINE" +// } +// } +Map> targetAssignment = new TreeMap<>(); +targetAssignment.put("segment__1__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); +targetAssignment.put("segment__2__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); +targetAssignment.put("segment__3__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); +targetAssignment.put("segment__4__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); + +// Number of segments to offload: +// { +// "host1": 0, +// "host2": 2, +// "host3": 2, +// "host4": 0, +// "host5": -2, +// "host6": -2 +// } +Map numSegmentsToOffloadMap = +TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +assertEquals(numSegmentsToOffloadMap.size(), 6); +assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0); +assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2); +assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2); +assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0); +assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2); +assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2); + +// Next assignment with 2 minimum available replicas with or without strict replica-group should reach the target +// assignment after two steps. Batch size = 1, unique partitionIds +for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) { + Object2IntOpenHashMap segmentToPartitionIdMap = new Object2IntOpenHashMap<>(); + Map> nextAssignment = + TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, + 1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER); + assertNotEquals(nextAssignment, targetAssignment); + assertEquals(nextAssignment.get("segment__1__0__9834786L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host3", "host5"))); + assertEquals(nextAssignment.get("segment__2__0__9834786L").keySet(), +
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2085024994 ## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java: ## @@ -1438,4 +1464,1011 @@ public void testIsExternalViewConverged() { } } } + + @Test + public void testAssignmentWithServerBatching() { +// Using LLC segment naming so that batching can be tested for both non-strict replica group and strict replica +// group based assignment. Otherwise, the partitionId might have to be fetched from ZK SegmentMetadata which will +// not exist since these aren't actual tables +// Current assignment: +// { +// "segment__1__0__9834786L": { +// "host1": "ONLINE", +// "host2": "ONLINE", +// "host3": "ONLINE" +// }, +// "segment__2__0__9834786L": { +// "host2": "ONLINE", +// "host3": "ONLINE", +// "host4": "ONLINE" +// }, +// "segment__3__0__9834786L": { +// "host1": "ONLINE", +// "host2": "ONLINE", +// "host3": "ONLINE" +// }, +// "segment__4__0__9834786L": { +// "host2": "ONLINE", +// "host3": "ONLINE", +// "host4": "ONLINE" +// } +// } +Map> currentAssignment = new TreeMap<>(); +currentAssignment.put("segment__1__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); +currentAssignment.put("segment__2__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); +currentAssignment.put("segment__3__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); +currentAssignment.put("segment__4__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); + +// Target assignment 1: +// { +// "segment__1__0__9834786L": { +// "host1": "ONLINE", +// "host3": "ONLINE", +// "host5": "ONLINE" +// }, +// "segment__2__0__9834786L": { +// "host2": "ONLINE", +// "host4": "ONLINE", +// "host6": "ONLINE" +// }, +// "segment__3__0__9834786L": { +// "host1": "ONLINE", +// "host3": "ONLINE", +// "host5": "ONLINE" +// }, +// "segment__4__0__9834786L": { +// "host2": "ONLINE", +// "host4": "ONLINE", +// "host6": "ONLINE" +// } +// } +Map> targetAssignment = new TreeMap<>(); +targetAssignment.put("segment__1__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); +targetAssignment.put("segment__2__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); +targetAssignment.put("segment__3__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); +targetAssignment.put("segment__4__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); + +// Number of segments to offload: +// { +// "host1": 0, +// "host2": 2, +// "host3": 2, +// "host4": 0, +// "host5": -2, +// "host6": -2 +// } +Map numSegmentsToOffloadMap = +TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +assertEquals(numSegmentsToOffloadMap.size(), 6); +assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0); +assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2); +assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2); +assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0); +assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2); +assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2); + +// Next assignment with 2 minimum available replicas with or without strict replica-group should reach the target +// assignment after two steps. Batch size = 1, unique partitionIds +for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) { + Object2IntOpenHashMap segmentToPartitionIdMap = new Object2IntOpenHashMap<>(); + Map> nextAssignment = + TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, + 1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER); + assertNotEquals(nextAssignment, targetAssignment); + assertEquals(nextAssignment.get("segment__1__0__9834786L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host3", "host5"))); + assertEquals(nextAssignment.get("segment__2__0__9834786L").keySet(), +
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2085024597 ## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java: ## @@ -1438,4 +1464,1011 @@ public void testIsExternalViewConverged() { } } } + + @Test + public void testAssignmentWithServerBatching() { +// Using LLC segment naming so that batching can be tested for both non-strict replica group and strict replica +// group based assignment. Otherwise, the partitionId might have to be fetched from ZK SegmentMetadata which will +// not exist since these aren't actual tables +// Current assignment: +// { +// "segment__1__0__9834786L": { +// "host1": "ONLINE", +// "host2": "ONLINE", +// "host3": "ONLINE" +// }, +// "segment__2__0__9834786L": { +// "host2": "ONLINE", +// "host3": "ONLINE", +// "host4": "ONLINE" +// }, +// "segment__3__0__9834786L": { +// "host1": "ONLINE", +// "host2": "ONLINE", +// "host3": "ONLINE" +// }, +// "segment__4__0__9834786L": { +// "host2": "ONLINE", +// "host3": "ONLINE", +// "host4": "ONLINE" +// } +// } +Map> currentAssignment = new TreeMap<>(); +currentAssignment.put("segment__1__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); +currentAssignment.put("segment__2__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); +currentAssignment.put("segment__3__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host2", "host3"), ONLINE)); +currentAssignment.put("segment__4__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host3", "host4"), ONLINE)); + +// Target assignment 1: +// { +// "segment__1__0__9834786L": { +// "host1": "ONLINE", +// "host3": "ONLINE", +// "host5": "ONLINE" +// }, +// "segment__2__0__9834786L": { +// "host2": "ONLINE", +// "host4": "ONLINE", +// "host6": "ONLINE" +// }, +// "segment__3__0__9834786L": { +// "host1": "ONLINE", +// "host3": "ONLINE", +// "host5": "ONLINE" +// }, +// "segment__4__0__9834786L": { +// "host2": "ONLINE", +// "host4": "ONLINE", +// "host6": "ONLINE" +// } +// } +Map> targetAssignment = new TreeMap<>(); +targetAssignment.put("segment__1__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); +targetAssignment.put("segment__2__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); +targetAssignment.put("segment__3__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", "host3", "host5"), ONLINE)); +targetAssignment.put("segment__4__0__9834786L", +SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", "host4", "host6"), ONLINE)); + +// Number of segments to offload: +// { +// "host1": 0, +// "host2": 2, +// "host3": 2, +// "host4": 0, +// "host5": -2, +// "host6": -2 +// } +Map numSegmentsToOffloadMap = +TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +assertEquals(numSegmentsToOffloadMap.size(), 6); +assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0); +assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2); +assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2); +assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0); +assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2); +assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2); + +// Next assignment with 2 minimum available replicas with or without strict replica-group should reach the target +// assignment after two steps. Batch size = 1, unique partitionIds +for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) { + Object2IntOpenHashMap segmentToPartitionIdMap = new Object2IntOpenHashMap<>(); + Map> nextAssignment = + TableRebalancer.getNextAssignment(currentAssignment, targetAssignment, 2, enableStrictReplicaGroup, false, + 1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER); + assertNotEquals(nextAssignment, targetAssignment); + assertEquals(nextAssignment.get("segment__1__0__9834786L").keySet(), + new TreeSet<>(Arrays.asList("host1", "host3", "host5"))); + assertEquals(nextAssignment.get("segment__2__0__9834786L").keySet(), +
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2085023573 ## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java: ## @@ -105,568 +105,672 @@ public void setUp() @Test public void testRebalance() throws Exception { -int numServers = 3; -// Mock disk usage -Map diskUsageInfoMap = new HashMap<>(); - -for (int i = 0; i < numServers; i++) { - String instanceId = SERVER_INSTANCE_ID_PREFIX + i; - addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); - DiskUsageInfo diskUsageInfo1 = - new DiskUsageInfo(instanceId, "", 1000L, 500L, System.currentTimeMillis()); - diskUsageInfoMap.put(instanceId, diskUsageInfo1); -} - -ExecutorService executorService = Executors.newFixedThreadPool(10); -DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); -preChecker.init(_helixResourceManager, executorService, 1); -TableRebalancer tableRebalancer = -new TableRebalancer(_helixManager, null, null, preChecker, _helixResourceManager.getTableSizeReader()); -TableConfig tableConfig = -new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); - -// Rebalance should fail without creating the table -RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); -assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); -assertNull(rebalanceResult.getRebalanceSummaryResult()); - -// Rebalance with dry-run summary should fail without creating the table -RebalanceConfig rebalanceConfig = new RebalanceConfig(); -rebalanceConfig.setDryRun(true); -rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); -assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); -assertNull(rebalanceResult.getRebalanceSummaryResult()); - -// Create the table -addDummySchema(RAW_TABLE_NAME); -_helixResourceManager.addTable(tableConfig); - -// Add the segments -int numSegments = 10; -for (int i = 0; i < numSegments; i++) { - _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + i), null); -} -Map> oldSegmentAssignment = - _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(); - -// Rebalance with dry-run summary should return NO_OP status -rebalanceConfig = new RebalanceConfig(); -rebalanceConfig.setDryRun(true); -rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); -assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); -RebalanceSummaryResult rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); -assertNotNull(rebalanceSummaryResult); -assertNotNull(rebalanceSummaryResult.getServerInfo()); -assertNotNull(rebalanceSummaryResult.getSegmentInfo()); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); - assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(), 0); - assertNull(rebalanceSummaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary()); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); - assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); -assertNotNull(rebalanceSummaryResult.getTagsInfo()); -assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1); -assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(), -TagNameUtils.getOfflineTagForTenant(null)); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(), 0); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS); - assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(), numServers); -assertNotNull(rebalanceResult.getInstanceAssignment()); -assertNotNull(rebalanceResult.getSegmentAssignment()); - -// Dry-run mode should not change the IdealState - assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), -oldSegmentAssignment); - -// Rebalance should return NO_OP status -rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); -assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); - -// All servers should be assigned to the table -Map instanceAssignment = rebalanceResult.getInstanceAssignment(); -assertEquals(instanceAssignment.size(), 1); -InstancePartitions instancePartition
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2085024296 ## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java: ## @@ -163,6 +163,74 @@ public void testRebalance() "Failed to drop servers"); } + @Test + public void testRebalanceWithBatching() + throws Exception { +populateTables(); + +verifyIdealState(5, NUM_SERVERS); + +// setup the rebalance config +RebalanceConfig rebalanceConfig = new RebalanceConfig(); +rebalanceConfig.setDryRun(false); +rebalanceConfig.setMinAvailableReplicas(0); +rebalanceConfig.setIncludeConsuming(true); +rebalanceConfig.setBatchSizePerServer(1); + +// Add a new server +BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS); + +// Now we trigger a rebalance operation +TableConfig tableConfig = _resourceManager.getTableConfig(REALTIME_TABLE_NAME); +RebalanceResult rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + +// Check the number of replicas after rebalancing +int finalReplicas = _resourceManager.getServerInstancesForTable(getTableName(), TableType.REALTIME).size(); + +// Check that a replica has been added Review Comment: copy paste from the other test - and I've fixed up both to call it "server" instead of "replica" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2084769589 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedIn
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2084759396 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. The + * strict replica group routing can also be utilized for OFFLINE tables, thus StrictRealtimeSegmentAssignment + * also needs to be made more generic for the OFFLINE case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssig
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2084759396 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. The + * strict replica group routing can also be utilized for OFFLINE tables, thus StrictRealtimeSegmentAssignment + * also needs to be made more generic for the OFFLINE case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssig
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2083767580 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedIn
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2083767580 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedIn
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2083768379 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedIn
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2083768586 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedIn
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2083767580 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedIn
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
yashmayya commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2083511042 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, the instances assigned for the same partitionId can be different for different segments. + * For strict replica group routing with StrictRealtimeSegmentAssignment on the other hand, the assignment for a given + * partitionId will be the same across all segments. We can treat both cases similarly by creating a mapping from + * partitionId -> unique set of instance assignments -> currentAssignment. With StrictRealtimeSegmentAssignment, + * this map will have a single entry for 'unique set of instance assignments'. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, then the routing side and assignment side + * will be equivalent and all segments belonging to a given partitionId will be assigned to the same set of + * instances. Special handling to check each group of assigned instances can be removed in that case. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { +return enableStrictReplicaGroup +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map, Map>>> +partitionIdToAssignedInstancesToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to assigned instances to current assignment mapping if batching is disabled + // since we want to update the next assignment based on all partitions in this case. Use partitionId as 0 + // and a dummy set for the assigned instances. + partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new HashMap<>()); + partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), currentAssignment); +} else { + partitionIdToAssignedInstancesToCurrentAssignmentMap = + getPartitionIdToAssignedI
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2080683879 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but Review Comment: Actually I take that back. After discussion with @Jackie-Jiang there is some benefit for some use cases to use `StrictReplicaGroup` for OFFLINE tables. We need to change the segment assignment part to work generically with this concept in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2080686074 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2080689742 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but Review Comment: done ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1595,13 +1875,41 @@ private static Map> getNextNonStrictReplicaGroupAssi Map nextInstanceStateMap = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap)._instanceStateMap; - nextAssignment.put(segmentName, nextInstanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), - nextInstanceStateMap.keySet()); + Set serversAddedForSegment = getServersAddedInSingleSegmentAssignment(currentInstanceStateMap, + nextInstanceStateMap); + boolean anyServerExhaustedBatchSize = false; + if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { +for (String server : serversAddedForSegment) { + if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= batchSizePerServer) { +anyServerExhaustedBatchSize = true; +break; + } +} + } + if (anyServerExhaustedBatchSize) { +// Exhausted the batch size for at least 1 server, set to existing assignment +nextAssignment.put(segmentName, currentInstanceStateMap); + } else { +// Add the next assignment and update the segments added so far counts +for (String server : serversAddedForSegment) { + int numSegmentsAdded = serverToNumSegmentsAddedSoFar.getOrDefault(server, 0); + serverToNumSegmentsAddedSoFar.put(server, numSegmentsAdded + 1); +} Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2080689596 ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ## @@ -625,6 +625,13 @@ public RebalanceResult rebalance( + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") boolean lowDiskMode, @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime " + "contract cannot be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts, + @ApiParam(value = "How many maximum segment adds per server to update in the IdealState in each step. For " + + "non-strict replica group based assignment, this number will be the closest possible without splitting up " + + "a single segment's step's replicas across steps (so some servers may get fewer segments). For strict " + + "replica group based assignment, this is a per-server best effort value since each partition of a replica " + + "group must be moved as a whole and at least one partition in a replica group should be moved. A value of " + + "-1 is used to disable batching (unlimited segments).") Review Comment: done ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ## @@ -625,6 +625,13 @@ public RebalanceResult rebalance( + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") boolean lowDiskMode, @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime " + "contract cannot be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts, + @ApiParam(value = "How many maximum segment adds per server to update in the IdealState in each step. For " + + "non-strict replica group based assignment, this number will be the closest possible without splitting up " + + "a single segment's step's replicas across steps (so some servers may get fewer segments). For strict " Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2080686074 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2080680182 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. Review Comment: yes that is correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2080212649 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1595,13 +1875,41 @@ private static Map> getNextNonStrictReplicaGroupAssi Map nextInstanceStateMap = getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap, assignmentMap)._instanceStateMap; - nextAssignment.put(segmentName, nextInstanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), - nextInstanceStateMap.keySet()); + Set serversAddedForSegment = getServersAddedInSingleSegmentAssignment(currentInstanceStateMap, Review Comment: yes that's correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2079974548 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2079974548 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2079976643 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but Review Comment: At least for `REALTIME` this should go away once we fix the issue. Need to figure out the situation with `OFFLINE` though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
yashmayya commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2079510950 ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ## @@ -625,6 +625,13 @@ public RebalanceResult rebalance( + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") boolean lowDiskMode, @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime " + "contract cannot be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts, + @ApiParam(value = "How many maximum segment adds per server to update in the IdealState in each step. For " + + "non-strict replica group based assignment, this number will be the closest possible without splitting up " + + "a single segment's step's replicas across steps (so some servers may get fewer segments). For strict " + + "replica group based assignment, this is a per-server best effort value since each partition of a replica " + + "group must be moved as a whole and at least one partition in a replica group should be moved. A value of " + + "-1 is used to disable batching (unlimited segments).") Review Comment: > unlimited segments Might be useful to explicitly clarify that this is "per incremental step in the rebalance", while keeping the min available replicas invariant intact so that users don't get confused about the batching mechanism's semantics. ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ## @@ -625,6 +625,13 @@ public RebalanceResult rebalance( + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") boolean lowDiskMode, @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime " + "contract cannot be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts, + @ApiParam(value = "How many maximum segment adds per server to update in the IdealState in each step. For " + + "non-strict replica group based assignment, this number will be the closest possible without splitting up " + + "a single segment's step's replicas across steps (so some servers may get fewer segments). For strict " Review Comment: > a single segment's step's replicas across steps nit: this wording is a little confusing ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. Review Comment: `StrictRealtimeSegmentAssignment` tries to ensure that all segments in a partition (`CONSUMING` and `COMPLETED`) are assigned to a _single_ instance in each replica group right? ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** +
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2078549279 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2078549279 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
J-HowHuang commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2078533623 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment)
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2078477653 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -224,6 +225,9 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb int minReplicasToKeepUpForNoDowntime = rebalanceConfig.getMinAvailableReplicas(); boolean lowDiskMode = rebalanceConfig.isLowDiskMode(); boolean bestEfforts = rebalanceConfig.isBestEfforts(); +int batchSizePerServer = rebalanceConfig.getBatchSizePerServer(); +Preconditions.checkState(batchSizePerServer != 0 && batchSizePerServer >= -1, +"TableRebalance batchSizePerServer must be > 0 or -1 to disable"); Review Comment: no, when we first enter the loop, the serverToNumSegmentsAddedSoFar.etOrDefault(server, 0) for the server will be 0. So we add at least 1 segment and the rebalance moves forward. I have tested with batchSizePerServer=1 let me know if i misunderstood your question though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
J-HowHuang commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2078488673 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -224,6 +225,9 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb int minReplicasToKeepUpForNoDowntime = rebalanceConfig.getMinAvailableReplicas(); boolean lowDiskMode = rebalanceConfig.isLowDiskMode(); boolean bestEfforts = rebalanceConfig.isBestEfforts(); +int batchSizePerServer = rebalanceConfig.getBatchSizePerServer(); +Preconditions.checkState(batchSizePerServer != 0 && batchSizePerServer >= -1, +"TableRebalance batchSizePerServer must be > 0 or -1 to disable"); Review Comment: oh okay the checks are before making assignment, got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
J-HowHuang commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2078295696 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment)
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2078485359 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2078480229 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2078477653 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -224,6 +225,9 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb int minReplicasToKeepUpForNoDowntime = rebalanceConfig.getMinAvailableReplicas(); boolean lowDiskMode = rebalanceConfig.isLowDiskMode(); boolean bestEfforts = rebalanceConfig.isBestEfforts(); +int batchSizePerServer = rebalanceConfig.getBatchSizePerServer(); +Preconditions.checkState(batchSizePerServer != 0 && batchSizePerServer >= -1, +"TableRebalance batchSizePerServer must be > 0 or -1 to disable"); Review Comment: not, when we first enter the loop, the serverToNumSegmentsAddedSoFar.etOrDefault(server, 0) for the server will be 0. So we add at least 1 segment and the rebalance moves forward. I have tested with batchSizePerServer=1 let me know if i misunderstood your question though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2063997789 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2063997789 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2063997789 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2063997789 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, isStrictRealtimeSegmentAssignment, +LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. + * + * For strict replica group routing only (where the segment assignment is not StrictRealtimeSegmentAssignment) + * if batching is enabled, don't group the assignment by partitionId, since the segments of the same partitionId do + * not need to be assigned to the same servers. For strict replica group routing with strict replica group + * assignment on the other hand, group the assignment by partitionId since a partition must move as a whole, and they + * have the same servers assigned across all segments belonging to the same partitionId. + * + * TODO: Ideally if strict replica group routing is enabled then StrictRealtimeSegmentAssignment should be used, but + * this is not enforced in the code today. Once enforcement is added, there will no longer be any need to + * handle strict replica group routing only v.s. strict replica group routing + assignment. Remove the + * getNextStrictReplicaGroupRoutingOnlyAssignment() function. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { -return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) -: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, boolean isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) { +return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment) +? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, lowDiskMode, +batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: enableStrictReplicaGroup +? getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, tableRebalanceLogger) +: getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059622741 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059622741 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059084112 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059154741 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059154741 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059114227 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059113164 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
J-HowHuang commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059110355 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059103904 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059094890 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059080890 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb externalViewStabilizationTimeoutInMs); int expectedVersion = currentIdealState.getRecord().getVersion(); +// Cache segment partition id to avoid ZK reads. Similar behavior as cache used in StrictReplicaGroupAssignment +// NOTE: +// 1. This cache is used for table rebalance only, but not segment assignment. During rebalance, rebalanceTable() +//can be invoked multiple times when the ideal state changes during the rebalance process. +// 2. The cache won't be refreshed when an existing segment is replaced with a segment from a different partition. +//Replacing a segment with a segment from a different partition should not be allowed for upsert table because +//it will cause the segment being served by the wrong servers. If this happens during the table rebalance, +//another rebalance might be needed to fix the assignment. +Object2IntOpenHashMap segmentPartitionIdMap = new Object2IntOpenHashMap<>(); Review Comment: I am not against the idea but want some thoughts from @Jackie-Jiang on this. That cache is private to `StrictRealtimeSegmentAssignment` and not relevant for other segment assignments, so definitely don't want to expose an interface method to access that. Other option is to make this public, but I kind of think that leaks the internal behavior of `StrictRealtimeSegmentAssignment` outside. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059084112 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashMap<>(); Map, Set> availableInstancesMap = new HashMap<>(); -for (Map.Entry> entry : currentAssignment.entrySet()) { - String segmentName = entry.getKey(); - Map currentInstanceStateMap = entry.getValue(); - Map targetInstanceStateMap = targetAssignment.get(segmentName); - SingleSegmentAssignment assignment = - getNextSingleSegmentAssignment(currentInstanceStateMap, targetInstanceStateMap, minAvailableReplicas, - lowDiskMode, numSegmentsToOffloadMap, assignmentMap); - Set assignedInstances = assignment._instanceStateMap.keySet(); - Set availableInstances = assignment._availableInstances; - availableInstancesMap.compute(assignedInstances, (k, currentAvailableInstances) -> { -if (currentAvailableInstances == null) { - // First segment assigned to these instances, use the new assignment and update the available instances - nextAssignment.put(segmentName, assignment._instanceStateMap); - updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, currentInstanceStateMap.keySet(), k); - return availableInstances; -} else { - // There are other segments assigned to the same instances, check the available instances to see if adding the - // new assignment can still hold the minimum available replicas requirement - availableInstances.retainAll(currentAvailableInstances); -
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2059080890 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb externalViewStabilizationTimeoutInMs); int expectedVersion = currentIdealState.getRecord().getVersion(); +// Cache segment partition id to avoid ZK reads. Similar behavior as cache used in StrictReplicaGroupAssignment +// NOTE: +// 1. This cache is used for table rebalance only, but not segment assignment. During rebalance, rebalanceTable() +//can be invoked multiple times when the ideal state changes during the rebalance process. +// 2. The cache won't be refreshed when an existing segment is replaced with a segment from a different partition. +//Replacing a segment with a segment from a different partition should not be allowed for upsert table because +//it will cause the segment being served by the wrong servers. If this happens during the table rebalance, +//another rebalance might be needed to fix the assignment. +Object2IntOpenHashMap segmentPartitionIdMap = new Object2IntOpenHashMap<>(); Review Comment: I am not against the idea but want some thoughts from @Jackie-Jiang on this. That cache is private to `StrictRealtimeSegmentAssignment` and not relevant for other segment assignments, so definitely don't want to expose an interface to access that. Other option is to make this public, but I kind of think that leaks the internal behavior of `StrictRealtimeSegmentAssignment` outside. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
J-HowHuang commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2058977515 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb externalViewStabilizationTimeoutInMs); int expectedVersion = currentIdealState.getRecord().getVersion(); +// Cache segment partition id to avoid ZK reads. Similar behavior as cache used in StrictReplicaGroupAssignment +// NOTE: +// 1. This cache is used for table rebalance only, but not segment assignment. During rebalance, rebalanceTable() +//can be invoked multiple times when the ideal state changes during the rebalance process. +// 2. The cache won't be refreshed when an existing segment is replaced with a segment from a different partition. +//Replacing a segment with a segment from a different partition should not be allowed for upsert table because +//it will cause the segment being served by the wrong servers. If this happens during the table rebalance, +//another rebalance might be needed to fix the assignment. +Object2IntOpenHashMap segmentPartitionIdMap = new Object2IntOpenHashMap<>(); Review Comment: Is it possible to get the same cache map object from `segmentAssignment` if it's instance of `StrictRealtimeSegmentAssignment`? IIUC the cache would be available already while computing next assignment ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String tableNameWithType, String segment } } + /** + * Uses the default LOGGER + */ + @VisibleForTesting + static Map> getNextAssignment(Map> currentAssignment, + Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher) { +return getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, +lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, LOGGER); + } + /** * Returns the next assignment for the table based on the current assignment and the target assignment with regard to * the minimum available replicas requirement. For strict replica-group mode, track the available instances for all * the segments with the same instances in the next assignment, and ensure the minimum available replicas requirement * is met. If adding the assignment for a segment breaks the requirement, use the current assignment for the segment. */ - @VisibleForTesting - static Map> getNextAssignment(Map> currentAssignment, + private static Map> getNextAssignment(Map> currentAssignment, Map> targetAssignment, int minAvailableReplicas, boolean enableStrictReplicaGroup, - boolean lowDiskMode) { + boolean lowDiskMode, int batchSizePerServer, Object2IntOpenHashMap segmentPartitionIdMap, + PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) { return enableStrictReplicaGroup ? getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment, -minAvailableReplicas, lowDiskMode) +minAvailableReplicas, lowDiskMode, batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, +tableRebalanceLogger) : getNextNonStrictReplicaGroupAssignment(currentAssignment, targetAssignment, minAvailableReplicas, -lowDiskMode); +lowDiskMode, batchSizePerServer); } private static Map> getNextStrictReplicaGroupAssignment( Map> currentAssignment, Map> targetAssignment, - int minAvailableReplicas, boolean lowDiskMode) { + int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer, + Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher partitionIdFetcher, + Logger tableRebalanceLogger) { Map> nextAssignment = new TreeMap<>(); Map numSegmentsToOffloadMap = getNumSegmentsToOffloadMap(currentAssignment, targetAssignment); +Map>> partitionIdToCurrentAssignmentMap; +if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) { + // Don't calculate the partition id to current assignment mapping if batching is disabled since + // we want to update the next assignment based on all partitions in this case + partitionIdToCurrentAssignmentMap = new TreeMap<>(); + partitionIdToCurrentAssignmentMap.put(0, currentAssignment); +} else { + partitionIdToCurrentAssignmentMap = + getPartitionIdToCurrentAssignmentMap(currentAssignment, segmentPartitionIdMap, partitionIdFetcher); +} Map, Set>, Set> assignmentMap = new HashM
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2055214455 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -598,9 +611,12 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb "Rebalance has stopped already before updating the IdealState", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } + String partitionColumn = TableConfigUtils.getPartitionColumn(tableConfig); Map> nextAssignment = getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, Review Comment: done ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ## @@ -624,6 +624,13 @@ public RebalanceResult rebalance( + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") boolean lowDiskMode, @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime " + "contract cannot be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts, + @ApiParam(value = "How many maximum segment adds per server to update in the IdealState in each step. For " + + "non-strict replica group based assignment, this number will be the closest possible without splitting up " + + "a single segment's step's replicas across steps (so some servers may get fewer segments). For strict " + + "replica group based assignment, this is a per-server best effort value since each partition of a replica " + + "group must be moved as a whole and at least one partition in a replica group should be moved. A value of " + + "Integer.MAX_VALUE is used to indicate an unlimited batch size, which is the non-batching behavior.") + @DefaultValue("2147483647") @QueryParam("batchSizePerServer") int batchSizePerServer, Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
somandal commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2055075416 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb externalViewStabilizationTimeoutInMs); int expectedVersion = currentIdealState.getRecord().getVersion(); +// Cache segment partition id to avoid ZK reads. Similar behavior as cache used in StrictReplicaGroupAssignment +// NOTE: +// 1. This cache is used for table rebalance only, but not segment assignment. During rebalance, rebalanceTable() +//can be invoked multiple times when the ideal state changes during the rebalance process. +// 2. The cache won't be refreshed when an existing segment is replaced with a segment from a different partition. +//Replacing a segment with a segment from a different partition should not be allowed for upsert table because +//it will cause the segment being served by the wrong servers. If this happens during the table rebalance, +//another rebalance might be needed to fix the assignment. +Object2IntOpenHashMap segmentPartitionIdMap = new Object2IntOpenHashMap<>(); Review Comment: This is actually picked up from `StrictRealtimeSegmentAssignment` which has this optimization to reduce the overhead of computing the partitionId in case it has to be fetched from SegmentZkMetadata. Decided to add it here as well, since otherwise that optimization will essentially be undone. I thought of exposing it from `StrictRealtimeSegmentAssignment` as well, but it would require an interface change so decided against it 😅 Reference code: https://github.com/apache/pinot/blob/9f030bb0f20de69595156c18188946b2d2876e1c/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java#L69 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
Jackie-Jiang commented on code in PR #15617: URL: https://github.com/apache/pinot/pull/15617#discussion_r2054996050 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb externalViewStabilizationTimeoutInMs); int expectedVersion = currentIdealState.getRecord().getVersion(); +// Cache segment partition id to avoid ZK reads. Similar behavior as cache used in StrictReplicaGroupAssignment +// NOTE: +// 1. This cache is used for table rebalance only, but not segment assignment. During rebalance, rebalanceTable() +//can be invoked multiple times when the ideal state changes during the rebalance process. +// 2. The cache won't be refreshed when an existing segment is replaced with a segment from a different partition. +//Replacing a segment with a segment from a different partition should not be allowed for upsert table because +//it will cause the segment being served by the wrong servers. If this happens during the table rebalance, +//another rebalance might be needed to fix the assignment. +Object2IntOpenHashMap segmentPartitionIdMap = new Object2IntOpenHashMap<>(); Review Comment: For my understanding, is this introduced to reduce the overhead of computing the partition id for the same segment multiple times? Does this improvement also apply to the current algorithm? ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ## @@ -598,9 +611,12 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb "Rebalance has stopped already before updating the IdealState", instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } + String partitionColumn = TableConfigUtils.getPartitionColumn(tableConfig); Map> nextAssignment = getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, Review Comment: Seems most of the new arguments are used to calculate the partition id, consider passing a function (lambda) to simplify this ## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ## @@ -624,6 +624,13 @@ public RebalanceResult rebalance( + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") boolean lowDiskMode, @ApiParam(value = "Whether to use best-efforts to rebalance (not fail the rebalance when the no-downtime " + "contract cannot be achieved)") @DefaultValue("false") @QueryParam("bestEfforts") boolean bestEfforts, + @ApiParam(value = "How many maximum segment adds per server to update in the IdealState in each step. For " + + "non-strict replica group based assignment, this number will be the closest possible without splitting up " + + "a single segment's step's replicas across steps (so some servers may get fewer segments). For strict " + + "replica group based assignment, this is a per-server best effort value since each partition of a replica " + + "group must be moved as a whole and at least one partition in a replica group should be moved. A value of " + + "Integer.MAX_VALUE is used to indicate an unlimited batch size, which is the non-batching behavior.") + @DefaultValue("2147483647") @QueryParam("batchSizePerServer") int batchSizePerServer, Review Comment: Let's make default `0` or `-1` and treat non-positive value as unlimited -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org
Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]
codecov-commenter commented on PR #15617: URL: https://github.com/apache/pinot/pull/15617#issuecomment-2822368891 ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/15617?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `98.16514%` with `2 lines` in your changes missing coverage. Please review. > Project coverage is 62.75%. Comparing base [(`9f030bb`)](https://app.codecov.io/gh/apache/pinot/commit/9f030bb0f20de69595156c18188946b2d2876e1c?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`e0c7281`)](https://app.codecov.io/gh/apache/pinot/commit/e0c72819555cd8c4781a5d6474bcaa98b6256223?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). | [Files with missing lines](https://app.codecov.io/gh/apache/pinot/pull/15617?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...ntroller/helix/core/rebalance/TableRebalancer.java](https://app.codecov.io/gh/apache/pinot/pull/15617?src=pr&el=tree&filepath=pinot-controller%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fcontroller%2Fhelix%2Fcore%2Frebalance%2FTableRebalancer.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYmFsYW5jZS9UYWJsZVJlYmFsYW5jZXIuamF2YQ==) | 98.00% | [0 Missing and 2 partials :warning: ](https://app.codecov.io/gh/apache/pinot/pull/15617?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #15617 +/- ## - Coverage 62.77% 62.75% -0.02% + Complexity 1384 1374 -10 Files 2864 2864 Lines162734 162812 +78 Branches 2491724934 +17 + Hits 102151 102175 +24 - Misses5287552926 +51 - Partials 7708 7711 +3 ``` | [Flag](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration1](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration2](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [java-11](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [java-21](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `62.75% <98.16%> (+0.02%)` | :arrow_up: | | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `62.74% <98.16%> (-0.02%)` | :arrow_down: | | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `62.73% <98.16%> (+0.01%)` | :arrow_up: | | [temurin](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `62.75% <98.16%> (-0.02%)` | :arrow_down: | | [unittests](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_ter