Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-13 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2087036696


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case. The
+   *   strict replica group routing can also be utilized for OFFLINE 
tables, thus StrictRealtimeSegmentAssignment
+   *   also needs to be made more generic for the OFFLINE case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssig

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-13 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2086941117


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedIn

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-13 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2087034298


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,280 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case. The
+   *   strict replica group routing can also be utilized for OFFLINE 
tables, thus StrictRealtimeSegmentAssignment
+   *   also needs to be made more generic for the OFFLINE case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssig

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-13 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2087037492


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedIn

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-13 Thread via GitHub


yashmayya commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2086296902


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,280 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case. The
+   *   strict replica group routing can also be utilized for OFFLINE 
tables, thus StrictRealtimeSegmentAssignment
+   *   also needs to be made more generic for the OFFLINE case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssi

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-13 Thread via GitHub


yashmayya commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085993960


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case. The
+   *   strict replica group routing can also be utilized for OFFLINE 
tables, thus StrictRealtimeSegmentAssignment
+   *   also needs to be made more generic for the OFFLINE case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssi

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085595949


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedIn

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085026933


##
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java:
##
@@ -1438,4 +1464,1011 @@ public void testIsExternalViewConverged() {
   }
 }
   }
+
+  @Test
+  public void testAssignmentWithServerBatching() {
+// Using LLC segment naming so that batching can be tested for both 
non-strict replica group and strict replica
+// group based assignment. Otherwise, the partitionId might have to be 
fetched from ZK SegmentMetadata which will
+// not exist since these aren't actual tables
+// Current assignment:
+// {
+//   "segment__1__0__9834786L": {
+// "host1": "ONLINE",
+// "host2": "ONLINE",
+// "host3": "ONLINE"
+//   },
+//   "segment__2__0__9834786L": {
+// "host2": "ONLINE",
+// "host3": "ONLINE",
+// "host4": "ONLINE"
+//   },
+//   "segment__3__0__9834786L": {
+// "host1": "ONLINE",
+// "host2": "ONLINE",
+// "host3": "ONLINE"
+//   },
+//   "segment__4__0__9834786L": {
+// "host2": "ONLINE",
+// "host3": "ONLINE",
+// "host4": "ONLINE"
+//   }
+// }
+Map> currentAssignment = new TreeMap<>();
+currentAssignment.put("segment__1__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+currentAssignment.put("segment__2__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+currentAssignment.put("segment__3__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+currentAssignment.put("segment__4__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+
+// Target assignment 1:
+// {
+//   "segment__1__0__9834786L": {
+// "host1": "ONLINE",
+// "host3": "ONLINE",
+// "host5": "ONLINE"
+//   },
+//   "segment__2__0__9834786L": {
+// "host2": "ONLINE",
+// "host4": "ONLINE",
+// "host6": "ONLINE"
+//   },
+//   "segment__3__0__9834786L": {
+// "host1": "ONLINE",
+// "host3": "ONLINE",
+// "host5": "ONLINE"
+//   },
+//   "segment__4__0__9834786L": {
+// "host2": "ONLINE",
+// "host4": "ONLINE",
+// "host6": "ONLINE"
+//   }
+// }
+Map> targetAssignment = new TreeMap<>();
+targetAssignment.put("segment__1__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+targetAssignment.put("segment__2__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+targetAssignment.put("segment__3__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+targetAssignment.put("segment__4__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+
+// Number of segments to offload:
+// {
+//   "host1": 0,
+//   "host2": 2,
+//   "host3": 2,
+//   "host4": 0,
+//   "host5": -2,
+//   "host6": -2
+// }
+Map numSegmentsToOffloadMap =
+TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, 
targetAssignment);
+assertEquals(numSegmentsToOffloadMap.size(), 6);
+assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2);
+assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0);
+assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+// Next assignment with 2 minimum available replicas with or without 
strict replica-group should reach the target
+// assignment after two steps. Batch size = 1, unique partitionIds
+for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+  Object2IntOpenHashMap segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
+  Map> nextAssignment =
+  TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+  1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+  assertNotEquals(nextAssignment, targetAssignment);
+  assertEquals(nextAssignment.get("segment__1__0__9834786L").keySet(),
+  new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
+  assertEquals(nextAssignment.get("segment__2__0__9834786L").keySet(),
+ 

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2084759396


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case. The
+   *   strict replica group routing can also be utilized for OFFLINE 
tables, thus StrictRealtimeSegmentAssignment
+   *   also needs to be made more generic for the OFFLINE case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssig

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085025521


##
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java:
##
@@ -1438,4 +1464,1011 @@ public void testIsExternalViewConverged() {
   }
 }
   }
+
+  @Test
+  public void testAssignmentWithServerBatching() {
+// Using LLC segment naming so that batching can be tested for both 
non-strict replica group and strict replica
+// group based assignment. Otherwise, the partitionId might have to be 
fetched from ZK SegmentMetadata which will
+// not exist since these aren't actual tables
+// Current assignment:
+// {
+//   "segment__1__0__9834786L": {
+// "host1": "ONLINE",
+// "host2": "ONLINE",
+// "host3": "ONLINE"
+//   },
+//   "segment__2__0__9834786L": {
+// "host2": "ONLINE",
+// "host3": "ONLINE",
+// "host4": "ONLINE"
+//   },
+//   "segment__3__0__9834786L": {
+// "host1": "ONLINE",
+// "host2": "ONLINE",
+// "host3": "ONLINE"
+//   },
+//   "segment__4__0__9834786L": {
+// "host2": "ONLINE",
+// "host3": "ONLINE",
+// "host4": "ONLINE"
+//   }
+// }
+Map> currentAssignment = new TreeMap<>();
+currentAssignment.put("segment__1__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+currentAssignment.put("segment__2__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+currentAssignment.put("segment__3__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+currentAssignment.put("segment__4__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+
+// Target assignment 1:
+// {
+//   "segment__1__0__9834786L": {
+// "host1": "ONLINE",
+// "host3": "ONLINE",
+// "host5": "ONLINE"
+//   },
+//   "segment__2__0__9834786L": {
+// "host2": "ONLINE",
+// "host4": "ONLINE",
+// "host6": "ONLINE"
+//   },
+//   "segment__3__0__9834786L": {
+// "host1": "ONLINE",
+// "host3": "ONLINE",
+// "host5": "ONLINE"
+//   },
+//   "segment__4__0__9834786L": {
+// "host2": "ONLINE",
+// "host4": "ONLINE",
+// "host6": "ONLINE"
+//   }
+// }
+Map> targetAssignment = new TreeMap<>();
+targetAssignment.put("segment__1__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+targetAssignment.put("segment__2__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+targetAssignment.put("segment__3__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+targetAssignment.put("segment__4__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+
+// Number of segments to offload:
+// {
+//   "host1": 0,
+//   "host2": 2,
+//   "host3": 2,
+//   "host4": 0,
+//   "host5": -2,
+//   "host6": -2
+// }
+Map numSegmentsToOffloadMap =
+TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, 
targetAssignment);
+assertEquals(numSegmentsToOffloadMap.size(), 6);
+assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2);
+assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0);
+assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+// Next assignment with 2 minimum available replicas with or without 
strict replica-group should reach the target
+// assignment after two steps. Batch size = 1, unique partitionIds
+for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+  Object2IntOpenHashMap segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
+  Map> nextAssignment =
+  TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+  1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+  assertNotEquals(nextAssignment, targetAssignment);
+  assertEquals(nextAssignment.get("segment__1__0__9834786L").keySet(),
+  new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
+  assertEquals(nextAssignment.get("segment__2__0__9834786L").keySet(),
+ 

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085024994


##
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java:
##
@@ -1438,4 +1464,1011 @@ public void testIsExternalViewConverged() {
   }
 }
   }
+
+  @Test
+  public void testAssignmentWithServerBatching() {
+// Using LLC segment naming so that batching can be tested for both 
non-strict replica group and strict replica
+// group based assignment. Otherwise, the partitionId might have to be 
fetched from ZK SegmentMetadata which will
+// not exist since these aren't actual tables
+// Current assignment:
+// {
+//   "segment__1__0__9834786L": {
+// "host1": "ONLINE",
+// "host2": "ONLINE",
+// "host3": "ONLINE"
+//   },
+//   "segment__2__0__9834786L": {
+// "host2": "ONLINE",
+// "host3": "ONLINE",
+// "host4": "ONLINE"
+//   },
+//   "segment__3__0__9834786L": {
+// "host1": "ONLINE",
+// "host2": "ONLINE",
+// "host3": "ONLINE"
+//   },
+//   "segment__4__0__9834786L": {
+// "host2": "ONLINE",
+// "host3": "ONLINE",
+// "host4": "ONLINE"
+//   }
+// }
+Map> currentAssignment = new TreeMap<>();
+currentAssignment.put("segment__1__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+currentAssignment.put("segment__2__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+currentAssignment.put("segment__3__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+currentAssignment.put("segment__4__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+
+// Target assignment 1:
+// {
+//   "segment__1__0__9834786L": {
+// "host1": "ONLINE",
+// "host3": "ONLINE",
+// "host5": "ONLINE"
+//   },
+//   "segment__2__0__9834786L": {
+// "host2": "ONLINE",
+// "host4": "ONLINE",
+// "host6": "ONLINE"
+//   },
+//   "segment__3__0__9834786L": {
+// "host1": "ONLINE",
+// "host3": "ONLINE",
+// "host5": "ONLINE"
+//   },
+//   "segment__4__0__9834786L": {
+// "host2": "ONLINE",
+// "host4": "ONLINE",
+// "host6": "ONLINE"
+//   }
+// }
+Map> targetAssignment = new TreeMap<>();
+targetAssignment.put("segment__1__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+targetAssignment.put("segment__2__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+targetAssignment.put("segment__3__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+targetAssignment.put("segment__4__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+
+// Number of segments to offload:
+// {
+//   "host1": 0,
+//   "host2": 2,
+//   "host3": 2,
+//   "host4": 0,
+//   "host5": -2,
+//   "host6": -2
+// }
+Map numSegmentsToOffloadMap =
+TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, 
targetAssignment);
+assertEquals(numSegmentsToOffloadMap.size(), 6);
+assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2);
+assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0);
+assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+// Next assignment with 2 minimum available replicas with or without 
strict replica-group should reach the target
+// assignment after two steps. Batch size = 1, unique partitionIds
+for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+  Object2IntOpenHashMap segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
+  Map> nextAssignment =
+  TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+  1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+  assertNotEquals(nextAssignment, targetAssignment);
+  assertEquals(nextAssignment.get("segment__1__0__9834786L").keySet(),
+  new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
+  assertEquals(nextAssignment.get("segment__2__0__9834786L").keySet(),
+ 

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085024597


##
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java:
##
@@ -1438,4 +1464,1011 @@ public void testIsExternalViewConverged() {
   }
 }
   }
+
+  @Test
+  public void testAssignmentWithServerBatching() {
+// Using LLC segment naming so that batching can be tested for both 
non-strict replica group and strict replica
+// group based assignment. Otherwise, the partitionId might have to be 
fetched from ZK SegmentMetadata which will
+// not exist since these aren't actual tables
+// Current assignment:
+// {
+//   "segment__1__0__9834786L": {
+// "host1": "ONLINE",
+// "host2": "ONLINE",
+// "host3": "ONLINE"
+//   },
+//   "segment__2__0__9834786L": {
+// "host2": "ONLINE",
+// "host3": "ONLINE",
+// "host4": "ONLINE"
+//   },
+//   "segment__3__0__9834786L": {
+// "host1": "ONLINE",
+// "host2": "ONLINE",
+// "host3": "ONLINE"
+//   },
+//   "segment__4__0__9834786L": {
+// "host2": "ONLINE",
+// "host3": "ONLINE",
+// "host4": "ONLINE"
+//   }
+// }
+Map> currentAssignment = new TreeMap<>();
+currentAssignment.put("segment__1__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+currentAssignment.put("segment__2__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+currentAssignment.put("segment__3__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+currentAssignment.put("segment__4__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4"), ONLINE));
+
+// Target assignment 1:
+// {
+//   "segment__1__0__9834786L": {
+// "host1": "ONLINE",
+// "host3": "ONLINE",
+// "host5": "ONLINE"
+//   },
+//   "segment__2__0__9834786L": {
+// "host2": "ONLINE",
+// "host4": "ONLINE",
+// "host6": "ONLINE"
+//   },
+//   "segment__3__0__9834786L": {
+// "host1": "ONLINE",
+// "host3": "ONLINE",
+// "host5": "ONLINE"
+//   },
+//   "segment__4__0__9834786L": {
+// "host2": "ONLINE",
+// "host4": "ONLINE",
+// "host6": "ONLINE"
+//   }
+// }
+Map> targetAssignment = new TreeMap<>();
+targetAssignment.put("segment__1__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+targetAssignment.put("segment__2__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+targetAssignment.put("segment__3__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host5"), ONLINE));
+targetAssignment.put("segment__4__0__9834786L",
+SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host4", "host6"), ONLINE));
+
+// Number of segments to offload:
+// {
+//   "host1": 0,
+//   "host2": 2,
+//   "host3": 2,
+//   "host4": 0,
+//   "host5": -2,
+//   "host6": -2
+// }
+Map numSegmentsToOffloadMap =
+TableRebalancer.getNumSegmentsToOffloadMap(currentAssignment, 
targetAssignment);
+assertEquals(numSegmentsToOffloadMap.size(), 6);
+assertEquals((int) numSegmentsToOffloadMap.get("host1"), 0);
+assertEquals((int) numSegmentsToOffloadMap.get("host2"), 2);
+assertEquals((int) numSegmentsToOffloadMap.get("host3"), 2);
+assertEquals((int) numSegmentsToOffloadMap.get("host4"), 0);
+assertEquals((int) numSegmentsToOffloadMap.get("host5"), -2);
+assertEquals((int) numSegmentsToOffloadMap.get("host6"), -2);
+
+// Next assignment with 2 minimum available replicas with or without 
strict replica-group should reach the target
+// assignment after two steps. Batch size = 1, unique partitionIds
+for (boolean enableStrictReplicaGroup : Arrays.asList(false, true)) {
+  Object2IntOpenHashMap segmentToPartitionIdMap = new 
Object2IntOpenHashMap<>();
+  Map> nextAssignment =
+  TableRebalancer.getNextAssignment(currentAssignment, 
targetAssignment, 2, enableStrictReplicaGroup, false,
+  1, segmentToPartitionIdMap, SIMPLE_PARTITION_FETCHER);
+  assertNotEquals(nextAssignment, targetAssignment);
+  assertEquals(nextAssignment.get("segment__1__0__9834786L").keySet(),
+  new TreeSet<>(Arrays.asList("host1", "host3", "host5")));
+  assertEquals(nextAssignment.get("segment__2__0__9834786L").keySet(),
+ 

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085023573


##
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##
@@ -105,568 +105,672 @@ public void setUp()
   @Test
   public void testRebalance()
   throws Exception {
-int numServers = 3;
-// Mock disk usage
-Map diskUsageInfoMap = new HashMap<>();
-
-for (int i = 0; i < numServers; i++) {
-  String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
-  addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
-  DiskUsageInfo diskUsageInfo1 =
-  new DiskUsageInfo(instanceId, "", 1000L, 500L, 
System.currentTimeMillis());
-  diskUsageInfoMap.put(instanceId, diskUsageInfo1);
-}
-
-ExecutorService executorService = Executors.newFixedThreadPool(10);
-DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
-preChecker.init(_helixResourceManager, executorService, 1);
-TableRebalancer tableRebalancer =
-new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
-TableConfig tableConfig =
-new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
-
-// Rebalance should fail without creating the table
-RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new RebalanceConfig(), null);
-assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
-assertNull(rebalanceResult.getRebalanceSummaryResult());
-
-// Rebalance with dry-run summary should fail without creating the table
-RebalanceConfig rebalanceConfig = new RebalanceConfig();
-rebalanceConfig.setDryRun(true);
-rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
-assertNull(rebalanceResult.getRebalanceSummaryResult());
-
-// Create the table
-addDummySchema(RAW_TABLE_NAME);
-_helixResourceManager.addTable(tableConfig);
-
-// Add the segments
-int numSegments = 10;
-for (int i = 0; i < numSegments; i++) {
-  _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
-  SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i), null);
-}
-Map> oldSegmentAssignment =
-
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
-
-// Rebalance with dry-run summary should return NO_OP status
-rebalanceConfig = new RebalanceConfig();
-rebalanceConfig.setDryRun(true);
-rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
-assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
-RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
-assertNotNull(rebalanceSummaryResult);
-assertNotNull(rebalanceSummaryResult.getServerInfo());
-assertNotNull(rebalanceSummaryResult.getSegmentInfo());
-
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
-
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeDeleted(),
 0);
-
assertNull(rebalanceSummaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary());
-
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
-
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
-assertNotNull(rebalanceSummaryResult.getTagsInfo());
-assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
-assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
-TagNameUtils.getOfflineTagForTenant(null));
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
 0);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
 numSegments * NUM_REPLICAS);
-
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
 numServers);
-assertNotNull(rebalanceResult.getInstanceAssignment());
-assertNotNull(rebalanceResult.getSegmentAssignment());
-
-// Dry-run mode should not change the IdealState
-
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
-oldSegmentAssignment);
-
-// Rebalance should return NO_OP status
-rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
-assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
-
-// All servers should be assigned to the table
-Map instanceAssignment = 
rebalanceResult.getInstanceAssignment();
-assertEquals(instanceAssignment.size(), 1);
-InstancePartitions instancePartition

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085024296


##
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java:
##
@@ -163,6 +163,74 @@ public void testRebalance()
 "Failed to drop servers");
   }
 
+  @Test
+  public void testRebalanceWithBatching()
+  throws Exception {
+populateTables();
+
+verifyIdealState(5, NUM_SERVERS);
+
+// setup the rebalance config
+RebalanceConfig rebalanceConfig = new RebalanceConfig();
+rebalanceConfig.setDryRun(false);
+rebalanceConfig.setMinAvailableReplicas(0);
+rebalanceConfig.setIncludeConsuming(true);
+rebalanceConfig.setBatchSizePerServer(1);
+
+// Add a new server
+BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS);
+
+// Now we trigger a rebalance operation
+TableConfig tableConfig = 
_resourceManager.getTableConfig(REALTIME_TABLE_NAME);
+RebalanceResult rebalanceResult = _tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+
+// Check the number of replicas after rebalancing
+int finalReplicas = 
_resourceManager.getServerInstancesForTable(getTableName(), 
TableType.REALTIME).size();
+
+// Check that a replica has been added

Review Comment:
   copy paste from the other test - and I've fixed up both to call it "server" 
instead of "replica"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2084769589


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedIn

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2084759396


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case. The
+   *   strict replica group routing can also be utilized for OFFLINE 
tables, thus StrictRealtimeSegmentAssignment
+   *   also needs to be made more generic for the OFFLINE case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssig

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-12 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2084759396


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case. The
+   *   strict replica group routing can also be utilized for OFFLINE 
tables, thus StrictRealtimeSegmentAssignment
+   *   also needs to be made more generic for the OFFLINE case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssig

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-11 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2083767580


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedIn

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-11 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2083767580


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedIn

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-11 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2083768379


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedIn

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-11 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2083768586


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedIn

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-11 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2083767580


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedIn

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-11 Thread via GitHub


yashmayya commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2083511042


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   * 
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, the instances assigned for the same partitionId 
can be different for different segments.
+   * For strict replica group routing with StrictRealtimeSegmentAssignment on 
the other hand, the assignment for a given
+   * partitionId will be the same across all segments. We can treat both cases 
similarly by creating a mapping from
+   * partitionId -> unique set of instance assignments -> currentAssignment. 
With StrictRealtimeSegmentAssignment,
+   * this map will have a single entry for 'unique set of instance 
assignments'.
+   * 
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
then the routing side and assignment side
+   *   will be equivalent and all segments belonging to a given 
partitionId will be assigned to the same set of
+   *   instances. Special handling to check each group of assigned 
instances can be removed in that case.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+return enableStrictReplicaGroup
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map, Map>>>
+partitionIdToAssignedInstancesToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to assigned instances to current 
assignment mapping if batching is disabled
+  // since we want to update the next assignment based on all partitions 
in this case. Use partitionId as 0
+  // and a dummy set for the assigned instances.
+  partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new 
HashMap<>());
+  
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""), 
currentAssignment);
+} else {
+  partitionIdToAssignedInstancesToCurrentAssignmentMap =
+  
getPartitionIdToAssignedI

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2080683879


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but

Review Comment:
   Actually I take that back. After discussion with @Jackie-Jiang there is some 
benefit for some use cases to use `StrictReplicaGroup` for OFFLINE tables. We 
need to change the segment assignment part to work generically with this 
concept in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2080686074


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2080689742


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but

Review Comment:
   done



##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1595,13 +1875,41 @@ private static Map> 
getNextNonStrictReplicaGroupAssi
   Map nextInstanceStateMap =
   getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
   lowDiskMode, numSegmentsToOffloadMap, 
assignmentMap)._instanceStateMap;
-  nextAssignment.put(segmentName, nextInstanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(),
-  nextInstanceStateMap.keySet());
+  Set serversAddedForSegment = 
getServersAddedInSingleSegmentAssignment(currentInstanceStateMap,
+  nextInstanceStateMap);
+  boolean anyServerExhaustedBatchSize = false;
+  if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) 
{
+for (String server : serversAddedForSegment) {
+  if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >= 
batchSizePerServer) {
+anyServerExhaustedBatchSize = true;
+break;
+  }
+}
+  }
+  if (anyServerExhaustedBatchSize) {
+// Exhausted the batch size for at least 1 server, set to existing 
assignment
+nextAssignment.put(segmentName, currentInstanceStateMap);
+  } else {
+// Add the next assignment and update the segments added so far counts
+for (String server : serversAddedForSegment) {
+  int numSegmentsAdded = 
serverToNumSegmentsAddedSoFar.getOrDefault(server, 0);
+  serverToNumSegmentsAddedSoFar.put(server, numSegmentsAdded + 1);
+}

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2080689596


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -625,6 +625,13 @@ public RebalanceResult rebalance(
   + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") 
boolean lowDiskMode,
   @ApiParam(value = "Whether to use best-efforts to rebalance (not fail 
the rebalance when the no-downtime "
   + "contract cannot be achieved)") @DefaultValue("false") 
@QueryParam("bestEfforts") boolean bestEfforts,
+  @ApiParam(value = "How many maximum segment adds per server to update in 
the IdealState in each step. For "
+  + "non-strict replica group based assignment, this number will be 
the closest possible without splitting up "
+  + "a single segment's step's replicas across steps (so some servers 
may get fewer segments). For strict "
+  + "replica group based assignment, this is a per-server best effort 
value since each partition of a replica "
+  + "group must be moved as a whole and at least one partition in a 
replica group should be moved. A value of "
+  + "-1 is used to disable batching (unlimited segments).")

Review Comment:
   done



##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -625,6 +625,13 @@ public RebalanceResult rebalance(
   + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") 
boolean lowDiskMode,
   @ApiParam(value = "Whether to use best-efforts to rebalance (not fail 
the rebalance when the no-downtime "
   + "contract cannot be achieved)") @DefaultValue("false") 
@QueryParam("bestEfforts") boolean bestEfforts,
+  @ApiParam(value = "How many maximum segment adds per server to update in 
the IdealState in each step. For "
+  + "non-strict replica group based assignment, this number will be 
the closest possible without splitting up "
+  + "a single segment's step's replicas across steps (so some servers 
may get fewer segments). For strict "

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2080686074


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2080680182


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.

Review Comment:
   yes that is correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2080212649


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1595,13 +1875,41 @@ private static Map> 
getNextNonStrictReplicaGroupAssi
   Map nextInstanceStateMap =
   getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
   lowDiskMode, numSegmentsToOffloadMap, 
assignmentMap)._instanceStateMap;
-  nextAssignment.put(segmentName, nextInstanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(),
-  nextInstanceStateMap.keySet());
+  Set serversAddedForSegment = 
getServersAddedInSingleSegmentAssignment(currentInstanceStateMap,

Review Comment:
   yes that's correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2079974548


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2079974548


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2079976643


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but

Review Comment:
   At least for `REALTIME` this should go away once we fix the issue. Need to 
figure out the situation with `OFFLINE` though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-08 Thread via GitHub


yashmayya commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2079510950


##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -625,6 +625,13 @@ public RebalanceResult rebalance(
   + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") 
boolean lowDiskMode,
   @ApiParam(value = "Whether to use best-efforts to rebalance (not fail 
the rebalance when the no-downtime "
   + "contract cannot be achieved)") @DefaultValue("false") 
@QueryParam("bestEfforts") boolean bestEfforts,
+  @ApiParam(value = "How many maximum segment adds per server to update in 
the IdealState in each step. For "
+  + "non-strict replica group based assignment, this number will be 
the closest possible without splitting up "
+  + "a single segment's step's replicas across steps (so some servers 
may get fewer segments). For strict "
+  + "replica group based assignment, this is a per-server best effort 
value since each partition of a replica "
+  + "group must be moved as a whole and at least one partition in a 
replica group should be moved. A value of "
+  + "-1 is used to disable batching (unlimited segments).")

Review Comment:
   > unlimited segments
   
   Might be useful to explicitly clarify that this is "per incremental step in 
the rebalance", while keeping the min available replicas invariant intact so 
that users don't get confused about the batching mechanism's semantics.



##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -625,6 +625,13 @@ public RebalanceResult rebalance(
   + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") 
boolean lowDiskMode,
   @ApiParam(value = "Whether to use best-efforts to rebalance (not fail 
the rebalance when the no-downtime "
   + "contract cannot be achieved)") @DefaultValue("false") 
@QueryParam("bestEfforts") boolean bestEfforts,
+  @ApiParam(value = "How many maximum segment adds per server to update in 
the IdealState in each step. For "
+  + "non-strict replica group based assignment, this number will be 
the closest possible without splitting up "
+  + "a single segment's step's replicas across steps (so some servers 
may get fewer segments). For strict "

Review Comment:
   > a single segment's step's replicas across steps
   
   nit: this wording is a little confusing



##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.

Review Comment:
   `StrictRealtimeSegmentAssignment` tries to ensure that all segments in a 
partition (`CONSUMING` and `COMPLETED`) are assigned to a _single_ instance in 
each replica group right?



##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-07 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2078549279


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-07 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2078549279


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-07 Thread via GitHub


J-HowHuang commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2078533623


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment)

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-07 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2078477653


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -224,6 +225,9 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 int minReplicasToKeepUpForNoDowntime = 
rebalanceConfig.getMinAvailableReplicas();
 boolean lowDiskMode = rebalanceConfig.isLowDiskMode();
 boolean bestEfforts = rebalanceConfig.isBestEfforts();
+int batchSizePerServer = rebalanceConfig.getBatchSizePerServer();
+Preconditions.checkState(batchSizePerServer != 0 && batchSizePerServer >= 
-1,
+"TableRebalance batchSizePerServer must be > 0 or -1 to disable");

Review Comment:
   no, when we first enter the loop, the 
serverToNumSegmentsAddedSoFar.etOrDefault(server, 0) for the server will be 0. 
So we add at least 1 segment and the rebalance moves forward. I have tested 
with batchSizePerServer=1
   
   let me know if i misunderstood your question though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-07 Thread via GitHub


J-HowHuang commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2078488673


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -224,6 +225,9 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 int minReplicasToKeepUpForNoDowntime = 
rebalanceConfig.getMinAvailableReplicas();
 boolean lowDiskMode = rebalanceConfig.isLowDiskMode();
 boolean bestEfforts = rebalanceConfig.isBestEfforts();
+int batchSizePerServer = rebalanceConfig.getBatchSizePerServer();
+Preconditions.checkState(batchSizePerServer != 0 && batchSizePerServer >= 
-1,
+"TableRebalance batchSizePerServer must be > 0 or -1 to disable");

Review Comment:
   oh okay the checks are before making assignment, got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-07 Thread via GitHub


J-HowHuang commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2078295696


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment)

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-07 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2078485359


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-07 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2078480229


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1524,67 +1535,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-05-07 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2078477653


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -224,6 +225,9 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 int minReplicasToKeepUpForNoDowntime = 
rebalanceConfig.getMinAvailableReplicas();
 boolean lowDiskMode = rebalanceConfig.isLowDiskMode();
 boolean bestEfforts = rebalanceConfig.isBestEfforts();
+int batchSizePerServer = rebalanceConfig.getBatchSizePerServer();
+Preconditions.checkState(batchSizePerServer != 0 && batchSizePerServer >= 
-1,
+"TableRebalance batchSizePerServer must be > 0 or -1 to disable");

Review Comment:
   not, when we first enter the loop, the 
serverToNumSegmentsAddedSoFar.etOrDefault(server, 0) for the server will be 0. 
So we add at least 1 segment and the rebalance moves forward. I have tested 
with batchSizePerServer=1
   
   let me know if i misunderstood your question though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-28 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2063997789


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-28 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2063997789


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-28 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2063997789


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-28 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2063997789


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1445,336 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, isStrictRealtimeSegmentAssignment,
+LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
+   *
+   * For strict replica group routing only (where the segment assignment is 
not StrictRealtimeSegmentAssignment)
+   * if batching is enabled, don't group the assignment by partitionId, since 
the segments of the same partitionId do
+   * not need to be assigned to the same servers. For strict replica group 
routing with strict replica group
+   * assignment on the other hand, group the assignment by partitionId since a 
partition must move as a whole, and they
+   * have the same servers assigned across all segments belonging to the same 
partitionId.
+   *
+   * TODO: Ideally if strict replica group routing is enabled then 
StrictRealtimeSegmentAssignment should be used, but
+   *   this is not enforced in the code today. Once enforcement is added, 
there will no longer be any need to
+   *   handle strict replica group routing only v.s. strict replica group 
routing + assignment. Remove the
+   *   getNextStrictReplicaGroupRoutingOnlyAssignment() function.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
-return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
-: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, boolean 
isStrictRealtimeSegmentAssignment, Logger tableRebalanceLogger) {
+return (enableStrictReplicaGroup && isStrictRealtimeSegmentAssignment)
+? getNextStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas, lowDiskMode,
+batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher, 
tableRebalanceLogger)
+: enableStrictReplicaGroup
+? 
getNextStrictReplicaGroupRoutingOnlyAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, tableRebalanceLogger)
+: getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-25 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059622741


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059622741


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059084112


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059154741


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059154741


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059114227


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059113164


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


J-HowHuang commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059110355


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-  

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059103904


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059094890


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059080890


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 externalViewStabilizationTimeoutInMs);
 int expectedVersion = currentIdealState.getRecord().getVersion();
 
+// Cache segment partition id to avoid ZK reads. Similar behavior as cache 
used in StrictReplicaGroupAssignment
+// NOTE:
+// 1. This cache is used for table rebalance only, but not segment 
assignment. During rebalance, rebalanceTable()
+//can be invoked multiple times when the ideal state changes during 
the rebalance process.
+// 2. The cache won't be refreshed when an existing segment is replaced 
with a segment from a different partition.
+//Replacing a segment with a segment from a different partition should 
not be allowed for upsert table because
+//it will cause the segment being served by the wrong servers. If this 
happens during the table rebalance,
+//another rebalance might be needed to fix the assignment.
+Object2IntOpenHashMap segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();

Review Comment:
   I am not against the idea but want some thoughts from @Jackie-Jiang on this. 
That cache is private to `StrictRealtimeSegmentAssignment` and not relevant for 
other segment assignments, so definitely don't want to expose an interface 
method to access that. Other option is to make this public, but I kind of think 
that leaks the internal behavior of `StrictRealtimeSegmentAssignment` outside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059084112


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashMap<>();
 Map, Set> availableInstancesMap = new HashMap<>();
-for (Map.Entry> entry : 
currentAssignment.entrySet()) {
-  String segmentName = entry.getKey();
-  Map currentInstanceStateMap = entry.getValue();
-  Map targetInstanceStateMap = 
targetAssignment.get(segmentName);
-  SingleSegmentAssignment assignment =
-  getNextSingleSegmentAssignment(currentInstanceStateMap, 
targetInstanceStateMap, minAvailableReplicas,
-  lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
-  Set assignedInstances = assignment._instanceStateMap.keySet();
-  Set availableInstances = assignment._availableInstances;
-  availableInstancesMap.compute(assignedInstances, (k, 
currentAvailableInstances) -> {
-if (currentAvailableInstances == null) {
-  // First segment assigned to these instances, use the new assignment 
and update the available instances
-  nextAssignment.put(segmentName, assignment._instanceStateMap);
-  updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap, 
currentInstanceStateMap.keySet(), k);
-  return availableInstances;
-} else {
-  // There are other segments assigned to the same instances, check 
the available instances to see if adding the
-  // new assignment can still hold the minimum available replicas 
requirement
-  availableInstances.retainAll(currentAvailableInstances);
-

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059080890


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 externalViewStabilizationTimeoutInMs);
 int expectedVersion = currentIdealState.getRecord().getVersion();
 
+// Cache segment partition id to avoid ZK reads. Similar behavior as cache 
used in StrictReplicaGroupAssignment
+// NOTE:
+// 1. This cache is used for table rebalance only, but not segment 
assignment. During rebalance, rebalanceTable()
+//can be invoked multiple times when the ideal state changes during 
the rebalance process.
+// 2. The cache won't be refreshed when an existing segment is replaced 
with a segment from a different partition.
+//Replacing a segment with a segment from a different partition should 
not be allowed for upsert table because
+//it will cause the segment being served by the wrong servers. If this 
happens during the table rebalance,
+//another rebalance might be needed to fix the assignment.
+Object2IntOpenHashMap segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();

Review Comment:
   I am not against the idea but want some thoughts from @Jackie-Jiang on this. 
That cache is private to `StrictRealtimeSegmentAssignment` and not relevant for 
other segment assignments, so definitely don't want to expose an interface to 
access that. Other option is to make this public, but I kind of think that 
leaks the internal behavior of `StrictRealtimeSegmentAssignment` outside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-24 Thread via GitHub


J-HowHuang commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2058977515


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 externalViewStabilizationTimeoutInMs);
 int expectedVersion = currentIdealState.getRecord().getVersion();
 
+// Cache segment partition id to avoid ZK reads. Similar behavior as cache 
used in StrictReplicaGroupAssignment
+// NOTE:
+// 1. This cache is used for table rebalance only, but not segment 
assignment. During rebalance, rebalanceTable()
+//can be invoked multiple times when the ideal state changes during 
the rebalance process.
+// 2. The cache won't be refreshed when an existing segment is replaced 
with a segment from a different partition.
+//Replacing a segment with a segment from a different partition should 
not be allowed for upsert table because
+//it will cause the segment being served by the wrong servers. If this 
happens during the table rebalance,
+//another rebalance might be needed to fix the assignment.
+Object2IntOpenHashMap segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();

Review Comment:
   Is it possible to get the same cache map object from `segmentAssignment` if 
it's instance of `StrictRealtimeSegmentAssignment`? IIUC the cache would be 
available already while computing next assignment



##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String 
tableNameWithType, String segment
 }
   }
 
+  /**
+   * Uses the default LOGGER
+   */
+  @VisibleForTesting
+  static Map> getNextAssignment(Map> currentAssignment,
+  Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher) {
+return getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
+lowDiskMode, batchSizePerServer, segmentPartitionIdMap, 
partitionIdFetcher, LOGGER);
+  }
+
   /**
* Returns the next assignment for the table based on the current assignment 
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group 
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure 
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement, 
use the current assignment for the segment.
*/
-  @VisibleForTesting
-  static Map> getNextAssignment(Map> currentAssignment,
+  private static Map> 
getNextAssignment(Map> currentAssignment,
   Map> targetAssignment, int 
minAvailableReplicas, boolean enableStrictReplicaGroup,
-  boolean lowDiskMode) {
+  boolean lowDiskMode, int batchSizePerServer, 
Object2IntOpenHashMap segmentPartitionIdMap,
+  PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
 return enableStrictReplicaGroup ? 
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
-minAvailableReplicas, lowDiskMode)
+minAvailableReplicas, lowDiskMode, batchSizePerServer, 
segmentPartitionIdMap, partitionIdFetcher,
+tableRebalanceLogger)
 : getNextNonStrictReplicaGroupAssignment(currentAssignment, 
targetAssignment, minAvailableReplicas,
-lowDiskMode);
+lowDiskMode, batchSizePerServer);
   }
 
   private static Map> 
getNextStrictReplicaGroupAssignment(
   Map> currentAssignment, Map> targetAssignment,
-  int minAvailableReplicas, boolean lowDiskMode) {
+  int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+  Object2IntOpenHashMap segmentPartitionIdMap, PartitionIdFetcher 
partitionIdFetcher,
+  Logger tableRebalanceLogger) {
 Map> nextAssignment = new TreeMap<>();
 Map numSegmentsToOffloadMap = 
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+Map>> 
partitionIdToCurrentAssignmentMap;
+if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+  // Don't calculate the partition id to current assignment mapping if 
batching is disabled since
+  // we want to update the next assignment based on all partitions in this 
case
+  partitionIdToCurrentAssignmentMap = new TreeMap<>();
+  partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+} else {
+  partitionIdToCurrentAssignmentMap =
+  getPartitionIdToCurrentAssignmentMap(currentAssignment, 
segmentPartitionIdMap, partitionIdFetcher);
+}
 Map, Set>, Set> assignmentMap = new 
HashM

Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-22 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2055214455


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -598,9 +611,12 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 "Rebalance has stopped already before updating the IdealState", 
instancePartitionsMap,
 tierToInstancePartitionsMap, targetAssignment, preChecksResult, 
summaryResult);
   }
+  String partitionColumn = 
TableConfigUtils.getPartitionColumn(tableConfig);
   Map> nextAssignment =
   getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,

Review Comment:
   done



##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -624,6 +624,13 @@ public RebalanceResult rebalance(
   + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") 
boolean lowDiskMode,
   @ApiParam(value = "Whether to use best-efforts to rebalance (not fail 
the rebalance when the no-downtime "
   + "contract cannot be achieved)") @DefaultValue("false") 
@QueryParam("bestEfforts") boolean bestEfforts,
+  @ApiParam(value = "How many maximum segment adds per server to update in 
the IdealState in each step. For "
+  + "non-strict replica group based assignment, this number will be 
the closest possible without splitting up "
+  + "a single segment's step's replicas across steps (so some servers 
may get fewer segments). For strict "
+  + "replica group based assignment, this is a per-server best effort 
value since each partition of a replica "
+  + "group must be moved as a whole and at least one partition in a 
replica group should be moved. A value of "
+  + "Integer.MAX_VALUE is used to indicate an unlimited batch size, 
which is the non-batching behavior.")
+  @DefaultValue("2147483647") @QueryParam("batchSizePerServer") int 
batchSizePerServer,

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-22 Thread via GitHub


somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2055075416


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 externalViewStabilizationTimeoutInMs);
 int expectedVersion = currentIdealState.getRecord().getVersion();
 
+// Cache segment partition id to avoid ZK reads. Similar behavior as cache 
used in StrictReplicaGroupAssignment
+// NOTE:
+// 1. This cache is used for table rebalance only, but not segment 
assignment. During rebalance, rebalanceTable()
+//can be invoked multiple times when the ideal state changes during 
the rebalance process.
+// 2. The cache won't be refreshed when an existing segment is replaced 
with a segment from a different partition.
+//Replacing a segment with a segment from a different partition should 
not be allowed for upsert table because
+//it will cause the segment being served by the wrong servers. If this 
happens during the table rebalance,
+//another rebalance might be needed to fix the assignment.
+Object2IntOpenHashMap segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();

Review Comment:
   This is actually picked up from `StrictRealtimeSegmentAssignment` which has 
this optimization to reduce the overhead of computing the partitionId in case 
it has to be fetched from SegmentZkMetadata. Decided to add it here as well, 
since otherwise that optimization will essentially be undone. I thought of 
exposing it from  `StrictRealtimeSegmentAssignment` as well, but it would 
require an interface change so decided against it 😅 
   
   Reference code:
   
https://github.com/apache/pinot/blob/9f030bb0f20de69595156c18188946b2d2876e1c/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java#L69
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-22 Thread via GitHub


Jackie-Jiang commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2054996050


##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -479,6 +482,16 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 externalViewStabilizationTimeoutInMs);
 int expectedVersion = currentIdealState.getRecord().getVersion();
 
+// Cache segment partition id to avoid ZK reads. Similar behavior as cache 
used in StrictReplicaGroupAssignment
+// NOTE:
+// 1. This cache is used for table rebalance only, but not segment 
assignment. During rebalance, rebalanceTable()
+//can be invoked multiple times when the ideal state changes during 
the rebalance process.
+// 2. The cache won't be refreshed when an existing segment is replaced 
with a segment from a different partition.
+//Replacing a segment with a segment from a different partition should 
not be allowed for upsert table because
+//it will cause the segment being served by the wrong servers. If this 
happens during the table rebalance,
+//another rebalance might be needed to fix the assignment.
+Object2IntOpenHashMap segmentPartitionIdMap = new 
Object2IntOpenHashMap<>();

Review Comment:
   For my understanding, is this introduced to reduce the overhead of computing 
the partition id for the same segment multiple times? Does this improvement 
also apply to the current algorithm?



##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##
@@ -598,9 +611,12 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
 "Rebalance has stopped already before updating the IdealState", 
instancePartitionsMap,
 tierToInstancePartitionsMap, targetAssignment, preChecksResult, 
summaryResult);
   }
+  String partitionColumn = 
TableConfigUtils.getPartitionColumn(tableConfig);
   Map> nextAssignment =
   getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,

Review Comment:
   Seems most of the new arguments are used to calculate the partition id, 
consider passing a function (lambda) to simplify this



##
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##
@@ -624,6 +624,13 @@ public RebalanceResult rebalance(
   + "more servers.") @DefaultValue("false") @QueryParam("lowDiskMode") 
boolean lowDiskMode,
   @ApiParam(value = "Whether to use best-efforts to rebalance (not fail 
the rebalance when the no-downtime "
   + "contract cannot be achieved)") @DefaultValue("false") 
@QueryParam("bestEfforts") boolean bestEfforts,
+  @ApiParam(value = "How many maximum segment adds per server to update in 
the IdealState in each step. For "
+  + "non-strict replica group based assignment, this number will be 
the closest possible without splitting up "
+  + "a single segment's step's replicas across steps (so some servers 
may get fewer segments). For strict "
+  + "replica group based assignment, this is a per-server best effort 
value since each partition of a replica "
+  + "group must be moved as a whole and at least one partition in a 
replica group should be moved. A value of "
+  + "Integer.MAX_VALUE is used to indicate an unlimited batch size, 
which is the non-batching behavior.")
+  @DefaultValue("2147483647") @QueryParam("batchSizePerServer") int 
batchSizePerServer,

Review Comment:
   Let's make default `0` or `-1` and treat non-positive value as unlimited



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



Re: [PR] Table Rebalance: Add support for server-level segment batching [pinot]

2025-04-22 Thread via GitHub


codecov-commenter commented on PR #15617:
URL: https://github.com/apache/pinot/pull/15617#issuecomment-2822368891

   ## 
[Codecov](https://app.codecov.io/gh/apache/pinot/pull/15617?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `98.16514%` with `2 lines` in your changes 
missing coverage. Please review.
   > Project coverage is 62.75%. Comparing base 
[(`9f030bb`)](https://app.codecov.io/gh/apache/pinot/commit/9f030bb0f20de69595156c18188946b2d2876e1c?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`e0c7281`)](https://app.codecov.io/gh/apache/pinot/commit/e0c72819555cd8c4781a5d6474bcaa98b6256223?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/pinot/pull/15617?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...ntroller/helix/core/rebalance/TableRebalancer.java](https://app.codecov.io/gh/apache/pinot/pull/15617?src=pr&el=tree&filepath=pinot-controller%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fcontroller%2Fhelix%2Fcore%2Frebalance%2FTableRebalancer.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYmFsYW5jZS9UYWJsZVJlYmFsYW5jZXIuamF2YQ==)
 | 98.00% | [0 Missing and 2 partials :warning: 
](https://app.codecov.io/gh/apache/pinot/pull/15617?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   
   Additional details and impacted files
   
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #15617  +/-   ##
   
   - Coverage 62.77%   62.75%   -0.02% 
   + Complexity 1384 1374  -10 
   
 Files  2864 2864  
 Lines162734   162812  +78 
 Branches  2491724934  +17 
   
   + Hits 102151   102175  +24 
   - Misses5287552926  +51 
   - Partials   7708 7711   +3 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration1](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[integration2](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[java-11](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `?` | |
   | 
[java-21](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `62.75% <98.16%> (+0.02%)` | :arrow_up: |
   | 
[skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `62.74% <98.16%> (-0.02%)` | :arrow_down: |
   | 
[skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `62.73% <98.16%> (+0.01%)` | :arrow_up: |
   | 
[temurin](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `62.75% <98.16%> (-0.02%)` | :arrow_down: |
   | 
[unittests](https://app.codecov.io/gh/apache/pinot/pull/15617/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_ter