This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch rename_replica_group in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 46bb4585b274b548b353441f845595c3d83d9e74 Author: Jackie (Xiaotian) Jiang <xaji...@linkedin.com> AuthorDate: Mon Aug 19 14:26:54 2019 -0700 [Instance Assignment] Rename instance level replica to replica-group To reduce the confusion of the code: - Replica-group: a set of instances that serves one replica of all the segments - Replica: one copy of the segment --- .../config/instance/InstanceAssignmentConfig.java | 14 +-- .../instance/InstanceAssignmentConfigUtils.java | 16 +-- ...va => InstanceReplicaGroupPartitionConfig.java} | 32 +++--- .../pinot/common/config/TableConfigTest.java | 27 ++--- .../helix/core/assignment/InstancePartitions.java | 33 +++--- .../instance/InstanceAssignmentDriver.java | 12 +- ... => InstanceReplicaGroupPartitionSelector.java} | 127 +++++++++++---------- .../segment/OfflineSegmentAssignment.java | 58 +++++----- .../segment/RealtimeSegmentAssignment.java | 39 ++++--- .../assignment/segment/SegmentAssignmentUtils.java | 25 ++-- ...PinotInstanceAssignmentRestletResourceTest.java | 62 +++++----- .../instance/InstanceAssignmentTest.java | 106 ++++++++--------- .../OfflineReplicaGroupSegmentAssignmentTest.java | 85 +++++++------- ...altimeNonReplicaGroupSegmentAssignmentTest.java | 19 +-- .../RealtimeReplicaGroupSegmentAssignmentTest.java | 50 ++++---- .../segment/SegmentAssignmentUtilsTest.java | 102 ++++++++--------- 16 files changed, 414 insertions(+), 393 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfig.java index 9aa5a9c..6d08f21 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfig.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfig.java @@ -38,10 +38,10 @@ public class InstanceAssignmentConfig { @NestedConfig private InstanceConstraintConfig _constraintConfig; - @ConfigKey("replicaPartitionConfig") - @ConfigDoc(value = "Configuration for the instance replica and partition of the instance assignment", mandatory = true) + @ConfigKey("replicaGroupPartitionConfig") + @ConfigDoc(value = "Configuration for the instance replica-group and partition of the instance assignment", mandatory = true) @NestedConfig - private InstanceReplicaPartitionConfig _replicaPartitionConfig; + private InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; @JsonProperty public InstanceTagPoolConfig getTagPoolConfig() { @@ -64,12 +64,12 @@ public class InstanceAssignmentConfig { } @JsonProperty - public InstanceReplicaPartitionConfig getReplicaPartitionConfig() { - return _replicaPartitionConfig; + public InstanceReplicaGroupPartitionConfig getReplicaGroupPartitionConfig() { + return _replicaGroupPartitionConfig; } @JsonProperty - public void setReplicaPartitionConfig(InstanceReplicaPartitionConfig replicaPartitionConfig) { - _replicaPartitionConfig = replicaPartitionConfig; + public void setReplicaGroupPartitionConfig(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig) { + _replicaGroupPartitionConfig = replicaGroupPartitionConfig; } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java index 446f107..91c960c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java @@ -42,14 +42,14 @@ public class InstanceAssignmentConfigUtils { Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); switch (instancePartitionsType) { - // Allow OFFLINE instance assignment if the OFFLINE table has it configured or (for backward-compatibility) is + // Allow OFFLINE instance assignment if the offline table has it configured or (for backward-compatibility) is // using replica-group segment assignment case OFFLINE: return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap != null && instanceAssignmentConfigMap .containsKey(InstancePartitionsType.OFFLINE)) || AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy())); - // Allow CONSUMING/COMPLETED instance assignment if the REALTIME table has it configured + // Allow CONSUMING/COMPLETED instance assignment if the real-time table has it configured case CONSUMING: case COMPLETED: return tableType == TableType.REALTIME && (instanceAssignmentConfigMap != null && instanceAssignmentConfigMap @@ -77,17 +77,17 @@ public class InstanceAssignmentConfigUtils { } // Generate default instance assignment config if it does not exist - // Only allow default config for OFFLINE table with replica-group segment assignment for backward-compatibility + // Only allow default config for offline table with replica-group segment assignment for backward-compatibility InstanceAssignmentConfig instanceAssignmentConfig = new InstanceAssignmentConfig(); InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig(); tagPoolConfig.setTag(TagNameUtils.getOfflineTagForTenant(tableConfig.getTenantConfig().getServer())); instanceAssignmentConfig.setTagPoolConfig(tagPoolConfig); - InstanceReplicaPartitionConfig replicaPartitionConfig = new InstanceReplicaPartitionConfig(); + InstanceReplicaGroupPartitionConfig replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(); replicaPartitionConfig.setReplicaGroupBased(true); SegmentsValidationAndRetentionConfig segmentConfig = tableConfig.getValidationConfig(); - replicaPartitionConfig.setNumReplicas(segmentConfig.getReplicationNumber()); + replicaPartitionConfig.setNumReplicaGroups(segmentConfig.getReplicationNumber()); ReplicaGroupStrategyConfig replicaGroupStrategyConfig = segmentConfig.getReplicaGroupStrategyConfig(); Preconditions.checkState(replicaGroupStrategyConfig != null, "Failed to find the replica-group strategy config"); String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn(); @@ -99,10 +99,10 @@ public class InstanceAssignmentConfigUtils { replicaPartitionConfig.setNumInstancesPerPartition(replicaGroupStrategyConfig.getNumInstancesPerPartition()); } else { // If partition column is not configured, use replicaGroupStrategyConfig.getNumInstancesPerPartition() as - // number of instances per replica for backward-compatibility - replicaPartitionConfig.setNumInstancesPerReplica(replicaGroupStrategyConfig.getNumInstancesPerPartition()); + // number of instances per replica-group for backward-compatibility + replicaPartitionConfig.setNumInstancesPerReplicaGroup(replicaGroupStrategyConfig.getNumInstancesPerPartition()); } - instanceAssignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig); + instanceAssignmentConfig.setReplicaGroupPartitionConfig(replicaPartitionConfig); return instanceAssignmentConfig; } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaPartitionConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaGroupPartitionConfig.java similarity index 75% rename from pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaPartitionConfig.java rename to pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaGroupPartitionConfig.java index 65d1e49..dadfd8a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaPartitionConfig.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaGroupPartitionConfig.java @@ -25,7 +25,7 @@ import org.apache.pinot.common.config.ConfigKey; @JsonIgnoreProperties(ignoreUnknown = true) -public class InstanceReplicaPartitionConfig { +public class InstanceReplicaGroupPartitionConfig { @ConfigKey("replicaGroupBased") @ConfigDoc("Whether to use replica-group based selection, false by default") @@ -35,20 +35,20 @@ public class InstanceReplicaPartitionConfig { @ConfigDoc("Number of instances to select for non-replica-group based selection, select all instances if not specified") private int _numInstances; - @ConfigKey("numReplicas") - @ConfigDoc("Number of replicas (replica-groups) for replica-group based selection") - private int _numReplicas; + @ConfigKey("numReplicaGroups") + @ConfigDoc("Number of replica-groups for replica-group based selection") + private int _numReplicaGroups; - @ConfigKey("numServersPerReplica") - @ConfigDoc("Number of instances per replica for replica-group based selection, select as many instances as possible if not specified") - private int _numInstancesPerReplica; + @ConfigKey("numServersPerReplicaGroup") + @ConfigDoc("Number of instances per replica-group for replica-group based selection, select as many instances as possible if not specified") + private int _numInstancesPerReplicaGroup; @ConfigKey("numPartitions") @ConfigDoc("Number of partitions for replica-group based selection, do not partition the replica-group (1 partition) if not specified") private int _numPartitions; @ConfigKey("numServersPerPartition") - @ConfigDoc("Number of instances per partition (within a replica) for replica-group based selection, select all instances if not specified") + @ConfigDoc("Number of instances per partition (within a replica-group) for replica-group based selection, select all instances if not specified") private int _numInstancesPerPartition; @JsonProperty @@ -72,23 +72,23 @@ public class InstanceReplicaPartitionConfig { } @JsonProperty - public int getNumReplicas() { - return _numReplicas; + public int getNumReplicaGroups() { + return _numReplicaGroups; } @JsonProperty - public void setNumReplicas(int numReplicas) { - _numReplicas = numReplicas; + public void setNumReplicaGroups(int numReplicaGroups) { + _numReplicaGroups = numReplicaGroups; } @JsonProperty - public int getNumInstancesPerReplica() { - return _numInstancesPerReplica; + public int getNumInstancesPerReplicaGroup() { + return _numInstancesPerReplicaGroup; } @JsonProperty - public void setNumInstancesPerReplica(int numInstancesPerReplica) { - _numInstancesPerReplica = numInstancesPerReplica; + public void setNumInstancesPerReplicaGroup(int numInstancesPerReplicaGroup) { + _numInstancesPerReplicaGroup = numInstancesPerReplicaGroup; } @JsonProperty diff --git a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java index b024d16..2675de8 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Set; import org.apache.pinot.common.config.instance.InstanceAssignmentConfig; import org.apache.pinot.common.config.instance.InstanceConstraintConfig; -import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig; +import org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.common.config.instance.InstanceTagPoolConfig; import org.apache.pinot.common.data.StarTreeIndexSpec; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; @@ -358,11 +358,11 @@ public class TableConfigTest { constraintConfig.setConstraints(Arrays.asList("constraint1", "constraint2")); instanceAssignmentConfig.setConstraintConfig(constraintConfig); - InstanceReplicaPartitionConfig replicaPartitionConfig = new InstanceReplicaPartitionConfig(); - replicaPartitionConfig.setReplicaGroupBased(true); - replicaPartitionConfig.setNumReplicas(3); - replicaPartitionConfig.setNumInstancesPerReplica(5); - instanceAssignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(); + replicaGroupPartitionConfig.setReplicaGroupBased(true); + replicaGroupPartitionConfig.setNumReplicaGroups(3); + replicaGroupPartitionConfig.setNumInstancesPerReplicaGroup(5); + instanceAssignmentConfig.setReplicaGroupPartitionConfig(replicaGroupPartitionConfig); TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE, instanceAssignmentConfig)).build(); @@ -460,12 +460,13 @@ public class TableConfigTest { InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig(); assertEquals(constraintConfig.getConstraints(), Arrays.asList("constraint1", "constraint2")); - InstanceReplicaPartitionConfig replicaPartitionConfig = instanceAssignmentConfig.getReplicaPartitionConfig(); - assertTrue(replicaPartitionConfig.isReplicaGroupBased()); - assertEquals(replicaPartitionConfig.getNumInstances(), 0); - assertEquals(replicaPartitionConfig.getNumReplicas(), 3); - assertEquals(replicaPartitionConfig.getNumInstancesPerReplica(), 5); - assertEquals(replicaPartitionConfig.getNumPartitions(), 0); - assertEquals(replicaPartitionConfig.getNumInstancesPerPartition(), 0); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = + instanceAssignmentConfig.getReplicaGroupPartitionConfig(); + assertTrue(replicaGroupPartitionConfig.isReplicaGroupBased()); + assertEquals(replicaGroupPartitionConfig.getNumInstances(), 0); + assertEquals(replicaGroupPartitionConfig.getNumReplicaGroups(), 3); + assertEquals(replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(), 5); + assertEquals(replicaGroupPartitionConfig.getNumPartitions(), 0); + assertEquals(replicaGroupPartitionConfig.getNumInstancesPerPartition(), 0); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java index 4060f4b..53c48d4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java @@ -33,19 +33,21 @@ import org.apache.pinot.common.utils.JsonUtils; /** * Instance (server) partitions for the table. * - * <p>The instance partitions is stored as a map from partition of the format: {@code <partitionId>_<replicaId>} to + * <p>The instance partitions is stored as a map from partition of the format: {@code <partitionId>_<replicaGroupId>} to * list of server instances, and is persisted under the ZK path: {@code <cluster>/PROPERTYSTORE/INSTANCE_PARTITIONS}. + * <p>When partition is not enabled, all instances will be stored as partition 0. + * <p>When replica-group is not enabled, all instances will be stored as replica-group 0. * <p>The segment assignment will be based on the instance partitions of the table. */ @JsonIgnoreProperties(ignoreUnknown = true) public class InstancePartitions { - private static final char PARTITION_REPLICA_SEPARATOR = '_'; + private static final char PARTITION_REPLICA_GROUP_SEPARATOR = '_'; // Name will be of the format "<rawTableName>_<type>", e.g. "table_OFFLINE", "table_CONSUMING", "table_COMPLETED" private final String _name; private final Map<String, List<String>> _partitionToInstancesMap; private int _numPartitions; - private int _numReplicas; + private int _numReplicaGroups; public InstancePartitions(String name) { _name = name; @@ -58,11 +60,11 @@ public class InstancePartitions { _name = name; _partitionToInstancesMap = partitionToInstancesMap; for (String key : partitionToInstancesMap.keySet()) { - int splitterIndex = key.indexOf(PARTITION_REPLICA_SEPARATOR); - int partition = Integer.parseInt(key.substring(0, splitterIndex)); - int replica = Integer.parseInt(key.substring(splitterIndex + 1)); - _numPartitions = Integer.max(_numPartitions, partition + 1); - _numReplicas = Integer.max(_numReplicas, replica + 1); + int separatorIndex = key.indexOf(PARTITION_REPLICA_GROUP_SEPARATOR); + int partitionId = Integer.parseInt(key.substring(0, separatorIndex)); + int replicaGroupId = Integer.parseInt(key.substring(separatorIndex + 1)); + _numPartitions = Integer.max(_numPartitions, partitionId + 1); + _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1); } } @@ -82,19 +84,20 @@ public class InstancePartitions { } @JsonIgnore - public int getNumReplicas() { - return _numReplicas; + public int getNumReplicaGroups() { + return _numReplicaGroups; } - public List<String> getInstances(int partitionId, int replicaId) { - return _partitionToInstancesMap.get(Integer.toString(partitionId) + PARTITION_REPLICA_SEPARATOR + replicaId); + public List<String> getInstances(int partitionId, int replicaGroupId) { + return _partitionToInstancesMap + .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId); } - public void setInstances(int partitionId, int replicaId, List<String> instances) { - String key = Integer.toString(partitionId) + PARTITION_REPLICA_SEPARATOR + replicaId; + public void setInstances(int partitionId, int replicaGroupId, List<String> instances) { + String key = Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId; _partitionToInstancesMap.put(key, instances); _numPartitions = Integer.max(_numPartitions, partitionId + 1); - _numReplicas = Integer.max(_numReplicas, replicaId + 1); + _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1); } public static InstancePartitions fromZNRecord(ZNRecord znRecord) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index a715834..b87b123 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -28,7 +28,7 @@ import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.config.instance.InstanceAssignmentConfig; import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils; import org.apache.pinot.common.config.instance.InstanceConstraintConfig; -import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig; +import org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.common.config.instance.InstanceTagPoolConfig; import org.apache.pinot.common.utils.InstancePartitionsType; import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; * <ul> * <li>Select instances based on the tag/pool configuration</li> * <li>Apply constraints to the instances (optional, multiple constraints can be chained up)</li> - * <li>Select instances based on the replica/partition configuration</li> + * <li>Select instances based on the replica-group/partition configuration</li> * </ul> */ public class InstanceAssignmentDriver { @@ -77,10 +77,10 @@ public class InstanceAssignmentDriver { poolToInstanceConfigsMap = constraintApplier.applyConstraint(poolToInstanceConfigsMap); } - InstanceReplicaPartitionConfig replicaPartitionConfig = assignmentConfig.getReplicaPartitionConfig(); - Preconditions.checkState(replicaPartitionConfig != null, "Instance replica/partition config is missing"); - InstanceReplicaPartitionSelector replicaPartitionSelector = - new InstanceReplicaPartitionSelector(replicaPartitionConfig, tableNameWithType); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = assignmentConfig.getReplicaGroupPartitionConfig(); + Preconditions.checkState(replicaGroupPartitionConfig != null, "Instance replica-group/partition config is missing"); + InstanceReplicaGroupPartitionSelector replicaPartitionSelector = + new InstanceReplicaGroupPartitionSelector(replicaGroupPartitionConfig, tableNameWithType); InstancePartitions instancePartitions = new InstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType))); replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java similarity index 52% rename from pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java rename to pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java index 90c35fb..9102d4a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java @@ -24,31 +24,30 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.helix.model.InstanceConfig; -import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig; +import org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The instance replica/partition selector is responsible for selecting the instances for each replica and partition. - * <p>NOTE: The replica here refers to the table level replica (replica-group), which is a set of instances that contain - * all segments for a table. + * The instance replica-group/partition selector is responsible for selecting the instances for each replica-group and + * partition. */ -public class InstanceReplicaPartitionSelector { - private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaPartitionSelector.class); +public class InstanceReplicaGroupPartitionSelector { + private static final Logger LOGGER = LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class); - private final InstanceReplicaPartitionConfig _replicaPartitionConfig; + private final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig; private final String _tableNameWithType; - public InstanceReplicaPartitionSelector(InstanceReplicaPartitionConfig replicaPartitionConfig, + public InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String tableNameWithType) { - _replicaPartitionConfig = replicaPartitionConfig; + _replicaGroupPartitionConfig = replicaGroupPartitionConfig; _tableNameWithType = tableNameWithType; } /** - * Selects instances based on the replica and partition config, and stores the result into the given instance + * Selects instances based on the replica-group/partition config, and stores the result into the given instance * partitions. */ public void selectInstances(Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap, @@ -59,99 +58,107 @@ public class InstanceReplicaPartitionSelector { int tableNameHash = Math.abs(_tableNameWithType.hashCode()); List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet()); pools.sort(null); - LOGGER.info("Starting instance replica/partition selection for table: {} with hash: {} from pools: {}", + LOGGER.info("Starting instance replica-group/partition selection for table: {} with hash: {} from pools: {}", _tableNameWithType, tableNameHash, pools); - if (_replicaPartitionConfig.isReplicaGroupBased()) { + if (_replicaGroupPartitionConfig.isReplicaGroupBased()) { // Replica-group based selection - int numReplicas = _replicaPartitionConfig.getNumReplicas(); - Preconditions.checkState(numReplicas > 0, "Number of replicas must be positive"); - Map<Integer, List<Integer>> poolToReplicaIdsMap = new TreeMap<>(); - for (int replicaId = 0; replicaId < numReplicas; replicaId++) { - // Pick one pool for each replica based on the table name hash + int numReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); + Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups must be positive"); + Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>(); + for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { + // Pick one pool for each replica-group based on the table name hash int pool = pools.get((tableNameHash + replicaId) % numPools); - poolToReplicaIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); + poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new ArrayList<>()).add(replicaId); } - LOGGER.info("Selecting {} replicas from pool: {} for table: {}", numReplicas, poolToReplicaIdsMap, + LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", numReplicaGroups, poolToReplicaGroupIdsMap, _tableNameWithType); - int numInstancesPerReplica = _replicaPartitionConfig.getNumInstancesPerReplica(); - if (numInstancesPerReplica > 0) { - // Check if we have enough instances if number of instances per replica is configured - for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaIdsMap.entrySet()) { + int numInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); + if (numInstancesPerReplicaGroup > 0) { + // Check if we have enough instances if number of instances per replica-group is configured + for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { int pool = entry.getKey(); int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); - int numInstancesToSelect = numInstancesPerReplica * entry.getValue().size(); + int numInstancesToSelect = numInstancesPerReplicaGroup * entry.getValue().size(); Preconditions.checkState(numInstancesToSelect <= numInstancesInPool, "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstancesInPool, numInstancesToSelect); } } else { - // Use as many instances as possible if number of instances per replica is not configured - numInstancesPerReplica = Integer.MAX_VALUE; - for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaIdsMap.entrySet()) { + // Use as many instances as possible if number of instances per replica-group is not configured + numInstancesPerReplicaGroup = Integer.MAX_VALUE; + for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { int pool = entry.getKey(); - int numReplicasInPool = entry.getValue().size(); + int numReplicaGroupsInPool = entry.getValue().size(); int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size(); - Preconditions.checkState(numReplicasInPool <= numInstancesInPool, - "Not enough qualified instances from pool: %s, cannot select %s replicas from %s instances", pool, - numReplicasInPool, numInstancesInPool); - numInstancesPerReplica = Math.min(numInstancesPerReplica, numInstancesInPool / numReplicasInPool); + Preconditions.checkState(numReplicaGroupsInPool <= numInstancesInPool, + "Not enough qualified instances from pool: %s, cannot select %s replica-groups from %s instances", pool, + numReplicaGroupsInPool, numInstancesInPool); + numInstancesPerReplicaGroup = + Math.min(numInstancesPerReplicaGroup, numInstancesInPool / numReplicaGroupsInPool); } } - LOGGER.info("Selecting {} instances per replica for table: {}", numInstancesPerReplica, _tableNameWithType); + LOGGER.info("Selecting {} instances per replica-group for table: {}", numInstancesPerReplicaGroup, + _tableNameWithType); - String[][] replicaIdToInstancesMap = new String[numReplicas][numInstancesPerReplica]; - for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaIdsMap.entrySet()) { + String[][] replicaGroupIdToInstancesMap = new String[numReplicaGroups][numInstancesPerReplicaGroup]; + for (Map.Entry<Integer, List<Integer>> entry : poolToReplicaGroupIdsMap.entrySet()) { List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(entry.getKey()); - List<Integer> replicaIdsInPool = entry.getValue(); + List<Integer> replicaGroupIdsInPool = entry.getValue(); - // Use round-robin to assign instances to each replica so that they get instances with similar picking priority + // Use round-robin to assign instances to each replica-group so that they get instances with similar picking + // priority + // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances per replica-group) + // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9] + // r0 r1 r0 r1 r0 r1 int instanceIdInPool = 0; - for (int instanceIdInReplica = 0; instanceIdInReplica < numInstancesPerReplica; instanceIdInReplica++) { - for (int replicaId : replicaIdsInPool) { - replicaIdToInstancesMap[replicaId][instanceIdInReplica] = + for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < numInstancesPerReplicaGroup; + instanceIdInReplicaGroup++) { + for (int replicaGroupId : replicaGroupIdsInPool) { + replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] = instanceConfigsInPool.get(instanceIdInPool++).getInstanceName(); } } } - // Assign instances within a replica to one partition if not configured - int numPartitions = _replicaPartitionConfig.getNumPartitions(); + // Assign instances within a replica-group to one partition if not configured + int numPartitions = _replicaGroupPartitionConfig.getNumPartitions(); if (numPartitions <= 0) { numPartitions = 1; } - // Assign all instances within a replica to each partition if not configured - int numInstancesPerPartition = _replicaPartitionConfig.getNumInstancesPerPartition(); + // Assign all instances within a replica-group to each partition if not configured + int numInstancesPerPartition = _replicaGroupPartitionConfig.getNumInstancesPerPartition(); if (numInstancesPerPartition > 0) { - Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplica, - "Number of instances per partition: %s must be smaller or equal to number of instances per replica: %s", - numInstancesPerPartition, numInstancesPerReplica); + Preconditions.checkState(numInstancesPerPartition <= numInstancesPerReplicaGroup, + "Number of instances per partition: %s must be smaller or equal to number of instances per replica-group: %s", + numInstancesPerPartition, numInstancesPerReplicaGroup); } else { - numInstancesPerPartition = numInstancesPerReplica; + numInstancesPerPartition = numInstancesPerReplicaGroup; } - LOGGER.info("Selecting {} partitions, {} instances per partition within a replica for table: {}", numPartitions, - numInstancesPerPartition, _tableNameWithType); + LOGGER.info("Selecting {} partitions, {} instances per partition within a replica-group for table: {}", + numPartitions, numInstancesPerPartition, _tableNameWithType); - // Assign consecutive instances within a replica to each partition. - // E.g. (within a replica, 5 instances, 3 partitions, 3 instances per partition) + // Assign consecutive instances within a replica-group to each partition. + // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances per partition) // [i0, i1, i2, i3, i4] // p0 p0 p0 p1 p1 // p1 p2 p2 p2 - for (int replicaId = 0; replicaId < numReplicas; replicaId++) { - int instanceIdInReplica = 0; + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + int instanceIdInReplicaGroup = 0; for (int partitionId = 0; partitionId < numPartitions; partitionId++) { List<String> instancesInPartition = new ArrayList<>(numInstancesPerPartition); for (int instanceIdInPartition = 0; instanceIdInPartition < numInstancesPerPartition; instanceIdInPartition++) { - instancesInPartition.add(replicaIdToInstancesMap[replicaId][instanceIdInReplica]); - instanceIdInReplica = (instanceIdInReplica + 1) % numInstancesPerReplica; + instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]); + instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % numInstancesPerReplicaGroup; } instancesInPartition.sort(null); - LOGGER.info("Selecting instances: {} for replica: {}, partition: {} for table: {}", instancesInPartition, - replicaId, partitionId, _tableNameWithType); - instancePartitions.setInstances(partitionId, replicaId, instancesInPartition); + LOGGER + .info("Selecting instances: {} for replica-group: {}, partition: {} for table: {}", instancesInPartition, + replicaGroupId, partitionId, _tableNameWithType); + instancePartitions.setInstances(partitionId, replicaGroupId, instancesInPartition); } } } else { @@ -164,7 +171,7 @@ public class InstanceReplicaPartitionSelector { int numInstanceConfigs = instanceConfigs.size(); // Assign all instances if not configured - int numInstancesToSelect = _replicaPartitionConfig.getNumInstances(); + int numInstancesToSelect = _replicaGroupPartitionConfig.getNumInstances(); if (numInstancesToSelect > 0) { Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs, "Not enough qualified instances from pool: %s (%s in the pool, asked for %s)", pool, numInstanceConfigs, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java index 5dfb034..f8c7fee 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java @@ -45,21 +45,22 @@ import org.slf4j.LoggerFactory; * Segment assignment for offline table. * <ul> * <li> - * Non-replica-group based assignment (only 1 replica in instance partitions): + * Non-replica-group based assignment (only 1 replica-group in instance partitions): * <p>Assign the segment to the instance with the least number of segments. In case of a tie, assign the segment to * the instance with the smallest index in the list. Use Helix AutoRebalanceStrategy to rebalance the table. * </li> * <li> - * Replica-group based assignment (more than 1 replicas in instance partitions): - * <p>Among replicas, always mirror the assignment (pick the same index of the instance). - * <p>Within each partition, assign the segment to the servers with the least segments already assigned. In case of - * a tie, assign to the server with the smallest index in the list. Do this for one replica and mirror the - * assignment to other replicas. - * <p>To rebalance a table, within each partition, first calculate the number of segments on each server, loop over - * all the segments and keep the assignment if number of segments for the server has not been reached and track the - * not assigned segments, then assign the left-over segments to the servers with the least segments, or the smallest - * index if there is a tie. Repeat the process for all the partitions in one replica, and mirror the assignment to - * other replicas. With this greedy algorithm, the result is deterministic and with minimum segment moves. + * Replica-group based assignment (more than 1 replica-groups in instance partitions): + * <p>Among replica-groups, always mirror the assignment (pick the same index of the instance). + * <p>Within each partition, assign the segment to the instances with the least segments already assigned. In case + * of a tie, assign to the instance with the smallest index in the list. Do this for one replica-group and mirror + * the assignment to other replica-groups. + * <p>To rebalance a table, within each partition, first calculate the number of segments on each instance, loop + * over all the segments and keep the assignment if number of segments for the instance has not been reached and + * track the not assigned segments, then assign the left-over segments to the instances with the least segments, or + * the smallest index if there is a tie. Repeat the process for all the partitions in one replica-group, and mirror + * the assignment to other replica-groups. With this greedy algorithm, the result is deterministic and with minimum + * segment moves. * </li> * </ul> */ @@ -76,8 +77,9 @@ public class OfflineSegmentAssignment implements SegmentAssignment { _helixManager = helixManager; _offlineTableName = tableConfig.getTableName(); _replication = tableConfig.getValidationConfig().getReplicationNumber(); - ReplicaGroupStrategyConfig strategyConfig = tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); - _partitionColumn = strategyConfig != null ? strategyConfig.getPartitionColumn() : null; + ReplicaGroupStrategyConfig replicaGroupStrategyConfig = + tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); + _partitionColumn = replicaGroupStrategyConfig != null ? replicaGroupStrategyConfig.getPartitionColumn() : null; if (_partitionColumn == null) { LOGGER.info("Initialized OfflineSegmentAssignment with replication: {} without partition column for table: {} ", @@ -98,7 +100,7 @@ public class OfflineSegmentAssignment implements SegmentAssignment { _offlineTableName); List<String> instancesAssigned; - if (instancePartitions.getNumReplicas() == 1) { + if (instancePartitions.getNumReplicaGroups() == 1) { // Non-replica-group based assignment // Assign the segment to the instance with the least segments, or the smallest id if there is a tie @@ -118,11 +120,11 @@ public class OfflineSegmentAssignment implements SegmentAssignment { } else { // Replica-group based assignment - int numReplicas = instancePartitions.getNumReplicas(); - if (numReplicas != _replication) { + int numReplicaGroups = instancePartitions.getNumReplicaGroups(); + if (numReplicaGroups != _replication) { LOGGER.warn( - "Number of replicas in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", - instancePartitions.getName(), numReplicas, _replication, _offlineTableName, numReplicas); + "Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", + instancePartitions.getName(), numReplicaGroups, _replication, _offlineTableName, numReplicaGroups); } // Fetch partition id from segment ZK metadata if partition column is configured @@ -152,7 +154,7 @@ public class OfflineSegmentAssignment implements SegmentAssignment { segmentPartitionId, partitionId, numPartitions, _offlineTableName); } - // First assign the segment to replica 0 + // First assign the segment to replica-group 0 List<String> instances = instancePartitions.getInstances(partitionId, 0); int[] numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances); @@ -166,11 +168,11 @@ public class OfflineSegmentAssignment implements SegmentAssignment { } } - // Mirror the assignment to all replicas - instancesAssigned = new ArrayList<>(numReplicas); - for (int replicaId = 0; replicaId < numReplicas; replicaId++) { + // Mirror the assignment to all replica-groups + instancesAssigned = new ArrayList<>(numReplicaGroups); + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { instancesAssigned - .add(instancePartitions.getInstances(partitionId, replicaId).get(instanceIdWithLeastSegmentsAssigned)); + .add(instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceIdWithLeastSegmentsAssigned)); } } @@ -188,7 +190,7 @@ public class OfflineSegmentAssignment implements SegmentAssignment { LOGGER.info("Rebalancing table: {} with instance partitions: {}", _offlineTableName, instancePartitions); Map<String, Map<String, String>> newAssignment; - if (instancePartitions.getNumReplicas() == 1) { + if (instancePartitions.getNumReplicaGroups() == 1) { // Non-replica-group based assignment List<String> instances = @@ -198,11 +200,11 @@ public class OfflineSegmentAssignment implements SegmentAssignment { } else { // Replica-group based assignment - int numReplicas = instancePartitions.getNumReplicas(); - if (numReplicas != _replication) { + int numReplicaGroups = instancePartitions.getNumReplicaGroups(); + if (numReplicaGroups != _replication) { LOGGER.warn( - "Number of replicas in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", - instancePartitions.getName(), numReplicas, _replication, _offlineTableName, numReplicas); + "Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", + instancePartitions.getName(), numReplicaGroups, _replication, _offlineTableName, numReplicaGroups); } if (_partitionColumn == null) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index ab279fb..35aa136 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -45,9 +45,9 @@ import org.slf4j.LoggerFactory; * differences: * <ul> * <li> - * 1. Within a replica, all segments of the same partition (steam partition) are always assigned to exactly one - * server, and because of that we can directly assign or rebalance the CONSUMING segments to the servers based - * on the partition id + * 1. Within a replica-group, all segments of the same partition (steam partition) are always assigned to the + * same exactly one instance, and because of that we can directly assign or rebalance the CONSUMING segments to + * the instances based on the partition id * </li> * <li> * 2. Partition id for an instance is derived from the index of the instance (within the replica-group for @@ -99,10 +99,10 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { private List<String> assignSegment(String segmentName, InstancePartitions instancePartitions) { int partitionId = new LLCSegmentName(segmentName).getPartitionId(); - if (instancePartitions.getNumReplicas() == 1) { + if (instancePartitions.getNumReplicaGroups() == 1) { // Non-replica-group based assignment: // Uniformly spray the partitions and replicas across the instances. - // E.g. (6 servers, 3 partitions, 4 replicas) + // E.g. (6 instances, 3 partitions, 4 replicas) // "0_0": [i0, i1, i2, i3, i4, i5 ] // p0r0 p0r1 p0r2 p1r3 p1r0 p1r1 // p1r2 p1r3 p2r0 p2r1 p2r2 p2r3 @@ -117,22 +117,22 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { return instancesAssigned; } else { // Replica-group based assignment: - // Within a replica, uniformly spray the partitions across the instances. - // E.g. (within a replica, 3 servers, 6 partitions) + // Within a replica-group, uniformly spray the partitions across the instances. + // E.g. (within a replica-group, 3 instances, 6 partitions) // "0_0": [i0, i1, i2] // p0 p1 p2 // p3 p4 p5 - int numReplicas = instancePartitions.getNumReplicas(); - if (numReplicas != _replication) { + int numReplicaGroups = instancePartitions.getNumReplicaGroups(); + if (numReplicaGroups != _replication) { LOGGER.warn( - "Number of replicas in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", - instancePartitions.getName(), numReplicas, _replication, _realtimeTableName, numReplicas); + "Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", + instancePartitions.getName(), numReplicaGroups, _replication, _realtimeTableName, numReplicaGroups); } - List<String> instancesAssigned = new ArrayList<>(numReplicas); - for (int replicaId = 0; replicaId < numReplicas; replicaId++) { - List<String> instances = instancePartitions.getInstances(0, replicaId); + List<String> instancesAssigned = new ArrayList<>(numReplicaGroups); + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + List<String> instances = instancePartitions.getInstances(0, replicaGroupId); instancesAssigned.add(instances.get(partitionId % instances.size())); } return instancesAssigned; @@ -156,7 +156,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { completedInstancePartitions); Map<String, Map<String, String>> newAssignment; - if (completedInstancePartitions.getNumReplicas() == 1) { + if (completedInstancePartitions.getNumReplicaGroups() == 1) { // Non-replica-group based assignment List<String> instances = SegmentAssignmentUtils @@ -166,11 +166,12 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { } else { // Replica-group based assignment - int numReplicas = completedInstancePartitions.getNumReplicas(); - if (numReplicas != _replication) { + int numReplicaGroups = completedInstancePartitions.getNumReplicaGroups(); + if (numReplicaGroups != _replication) { LOGGER.warn( - "Number of replicas in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", - completedInstancePartitions.getName(), numReplicas, _replication, _realtimeTableName, numReplicas); + "Number of replica-groups in instance partitions {}: {} does not match replication in table config: {} for table: {}, use: {}", + completedInstancePartitions.getName(), numReplicaGroups, _replication, _realtimeTableName, + numReplicaGroups); } Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index 38d9681..878f913 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -73,9 +73,10 @@ class SegmentAssignmentUtils { */ static List<String> getInstancesForNonReplicaGroupBasedAssignment(InstancePartitions instancePartitions, int replication) { - Preconditions.checkState(instancePartitions.getNumReplicas() == 1 && instancePartitions.getNumPartitions() == 1, - "Instance partitions: %s should contain 1 replica and 1 partition for non-replica-group based assignment", - instancePartitions.getName()); + Preconditions + .checkState(instancePartitions.getNumReplicaGroups() == 1 && instancePartitions.getNumPartitions() == 1, + "Instance partitions: %s should contain 1 replica and 1 partition for non-replica-group based assignment", + instancePartitions.getName()); List<String> instances = instancePartitions.getInstances(0, 0); int numInstances = instances.size(); Preconditions.checkState(numInstances >= replication, @@ -129,25 +130,25 @@ class SegmentAssignmentUtils { * Rebalances one partition of the table for the replica-group based segment assignment strategy. * <ul> * <li> - * 1. Calculate the target number of segments on each server + * 1. Calculate the target number of segments on each instance * </li> * <li> - * 2. Loop over all the segments and keep the assignment if target number of segments for the server has not been - * reached and track the not assigned segments + * 2. Loop over all the segments and keep the assignment if target number of segments for the instance has not + * been reached and track the not assigned segments * </li> * <li> - * 3. Assign the left-over segments to the servers with the least segments, or the smallest index if there is a + * 3. Assign the left-over segments to the instances with the least segments, or the smallest index if there is a * tie * </li> * <li> - * 4. Mirror the assignment to other replicas + * 4. Mirror the assignment to other replica-groups * </li> * </ul> */ static void rebalanceReplicaGroupBasedPartition(Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions, int partitionId, Set<String> segments, Map<String, Map<String, String>> newAssignment) { - // Fetch instances in replica 0 + // Fetch instances in replica-group 0 List<String> instances = instancePartitions.getInstances(partitionId, 0); Map<String, Integer> instanceNameToIdMap = SegmentAssignmentUtils.getInstanceNameToIdMap(instances); @@ -204,9 +205,9 @@ class SegmentAssignmentUtils { private static Map<String, String> getReplicaGroupBasedInstanceStateMap(InstancePartitions instancePartitions, int partitionId, int instanceId) { Map<String, String> instanceStateMap = new TreeMap<>(); - int numReplicas = instancePartitions.getNumReplicas(); - for (int replicaId = 0; replicaId < numReplicas; replicaId++) { - instanceStateMap.put(instancePartitions.getInstances(partitionId, replicaId).get(instanceId), + int numReplicaGroups = instancePartitions.getNumReplicaGroups(); + for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) { + instanceStateMap.put(instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceId), SegmentOnlineOfflineStateModel.ONLINE); } return instanceStateMap; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java index 004ff1f..8daecde 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java @@ -29,7 +29,7 @@ import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.config.TagNameUtils; import org.apache.pinot.common.config.Tenant; import org.apache.pinot.common.config.instance.InstanceAssignmentConfig; -import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig; +import org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.common.config.instance.InstanceTagPoolConfig; import org.apache.pinot.common.data.FieldSpec.DataType; import org.apache.pinot.common.data.Schema; @@ -109,12 +109,12 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { // Expected } - // Add OFFLINE instance assignment config to the OFFLINE table config + // Add OFFLINE instance assignment config to the offline table config InstanceAssignmentConfig offlineInstanceAssignmentConfig = new InstanceAssignmentConfig(); InstanceTagPoolConfig offlineInstanceTagPoolConfig = new InstanceTagPoolConfig(); offlineInstanceTagPoolConfig.setTag(TagNameUtils.getOfflineTagForTenant(TENANT_NAME)); offlineInstanceAssignmentConfig.setTagPoolConfig(offlineInstanceTagPoolConfig); - offlineInstanceAssignmentConfig.setReplicaPartitionConfig(new InstanceReplicaPartitionConfig()); + offlineInstanceAssignmentConfig.setReplicaGroupPartitionConfig(new InstanceReplicaGroupPartitionConfig()); offlineTableConfig.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.OFFLINE, offlineInstanceAssignmentConfig)); _helixResourceManager.setExistingTableConfig(offlineTableConfig); @@ -124,17 +124,17 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { assertEquals(instancePartitionsMap.size(), 1); InstancePartitions offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); assertNotNull(offlineInstancePartitions); - assertEquals(offlineInstancePartitions.getNumReplicas(), 1); + assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1); assertEquals(offlineInstancePartitions.getNumPartitions(), 1); assertEquals(offlineInstancePartitions.getInstances(0, 0).size(), 1); String offlineInstanceId = offlineInstancePartitions.getInstances(0, 0).get(0); - // Add CONSUMING instance assignment config to the REALTIME table config + // Add CONSUMING instance assignment config to the real-time table config InstanceAssignmentConfig consumingInstanceAssignmentConfig = new InstanceAssignmentConfig(); InstanceTagPoolConfig realtimeInstanceTagPoolConfig = new InstanceTagPoolConfig(); realtimeInstanceTagPoolConfig.setTag(TagNameUtils.getRealtimeTagForTenant(TENANT_NAME)); consumingInstanceAssignmentConfig.setTagPoolConfig(realtimeInstanceTagPoolConfig); - consumingInstanceAssignmentConfig.setReplicaPartitionConfig(new InstanceReplicaPartitionConfig()); + consumingInstanceAssignmentConfig.setReplicaGroupPartitionConfig(new InstanceReplicaGroupPartitionConfig()); realtimeTableConfig.setInstanceAssignmentConfigMap( Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstanceAssignmentConfig)); _helixResourceManager.setExistingTableConfig(realtimeTableConfig); @@ -144,15 +144,15 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { assertEquals(instancePartitionsMap.size(), 2); offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); assertNotNull(offlineInstancePartitions); - assertEquals(offlineInstancePartitions.getNumReplicas(), 1); + assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1); assertEquals(offlineInstancePartitions.getNumPartitions(), 1); assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); InstancePartitions consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); assertNotNull(consumingInstancePartitions); - assertEquals(consumingInstancePartitions.getNumReplicas(), 1); + assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1); assertEquals(consumingInstancePartitions.getNumPartitions(), 1); assertEquals(consumingInstancePartitions.getInstances(0, 0).size(), 1); - String realtimeInstanceId = consumingInstancePartitions.getInstances(0, 0).get(0); + String consumingInstanceId = consumingInstancePartitions.getInstances(0, 0).get(0); // Use OFFLINE instance assignment config as the COMPLETED instance assignment config realtimeTableConfig @@ -167,16 +167,16 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { assertEquals(instancePartitionsMap.size(), 3); offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); assertNotNull(offlineInstancePartitions); - assertEquals(offlineInstancePartitions.getNumReplicas(), 1); + assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1); assertEquals(offlineInstancePartitions.getNumPartitions(), 1); assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); assertNotNull(consumingInstancePartitions); - assertEquals(consumingInstancePartitions.getNumReplicas(), 1); + assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1); assertEquals(consumingInstancePartitions.getNumPartitions(), 1); - assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(realtimeInstanceId)); + assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(consumingInstanceId)); InstancePartitions completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED); - assertEquals(completedInstancePartitions.getNumReplicas(), 1); + assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1); assertEquals(completedInstancePartitions.getNumPartitions(), 1); assertEquals(completedInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); @@ -200,7 +200,7 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { instancePartitionsType.getInstancePartitionsName(RAW_TABLE_NAME)); } - // Remove the instance partitions for both OFFLINE and REALTIME table + // Remove the instance partitions for both offline and real-time table sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME, null)); try { getInstancePartitionsMap(); @@ -215,16 +215,16 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { assertEquals(instancePartitionsMap.size(), 3); offlineInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.OFFLINE); assertNotNull(offlineInstancePartitions); - assertEquals(offlineInstancePartitions.getNumReplicas(), 1); + assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1); assertEquals(offlineInstancePartitions.getNumPartitions(), 1); assertEquals(offlineInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); consumingInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING); assertNotNull(consumingInstancePartitions); - assertEquals(consumingInstancePartitions.getNumReplicas(), 1); + assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1); assertEquals(consumingInstancePartitions.getNumPartitions(), 1); - assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(realtimeInstanceId)); + assertEquals(consumingInstancePartitions.getInstances(0, 0), Collections.singletonList(consumingInstanceId)); completedInstancePartitions = instancePartitionsMap.get(InstancePartitionsType.COMPLETED); - assertEquals(completedInstancePartitions.getNumReplicas(), 1); + assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1); assertEquals(completedInstancePartitions.getNumPartitions(), 1); assertEquals(completedInstancePartitions.getInstances(0, 0), Collections.singletonList(offlineInstanceId)); @@ -236,14 +236,14 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { // Expected } - // Assign instances for both OFFLINE and REALTIME table + // Assign instances for both offline and real-time table sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, null, false), null); // Instance partitions should be persisted instancePartitionsMap = getInstancePartitionsMap(); assertEquals(instancePartitionsMap.size(), 3); - // Remove the instance partitions for REALTIME table + // Remove the instance partitions for real-time table sendDeleteRequest(_controllerRequestURLBuilder .forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME), null)); instancePartitionsMap = getInstancePartitionsMap(); @@ -262,19 +262,19 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE)); assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED)); - // Replace OFFLINE instance with REALTIME instance for COMPLETED instance partitions + // Replace OFFLINE instance with CONSUMING instance for COMPLETED instance partitions instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(_controllerRequestURLBuilder - .forInstanceReplace(RAW_TABLE_NAME, InstancePartitionsType.COMPLETED, offlineInstanceId, realtimeInstanceId), + .forInstanceReplace(RAW_TABLE_NAME, InstancePartitionsType.COMPLETED, offlineInstanceId, consumingInstanceId), null)); assertEquals(instancePartitionsMap.size(), 1); assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0, 0), - Collections.singletonList(realtimeInstanceId)); + Collections.singletonList(consumingInstanceId)); - // Replace the instance again using REALTIME table name (old instance does not exist) + // Replace the instance again using real-time table name (old instance does not exist) try { sendPostRequest(_controllerRequestURLBuilder .forInstanceReplace(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME), null, offlineInstanceId, - realtimeInstanceId), null); + consumingInstanceId), null); fail(); } catch (FileNotFoundException e) { // Expected @@ -286,27 +286,27 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest { consumingInstancePartitions.toJsonString())); assertEquals(instancePartitionsMap.size(), 1); assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0, 0), - Collections.singletonList(realtimeInstanceId)); + Collections.singletonList(consumingInstanceId)); // OFFLINE instance partitions should have OFFLINE instance, CONSUMING and COMPLETED instance partitions should have - // REALTIME instance + // CONSUMING instance instancePartitionsMap = getInstancePartitionsMap(); assertEquals(instancePartitionsMap.size(), 3); assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE).getInstances(0, 0), Collections.singletonList(offlineInstanceId)); assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0, 0), - Collections.singletonList(realtimeInstanceId)); + Collections.singletonList(consumingInstanceId)); assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0, 0), - Collections.singletonList(realtimeInstanceId)); + Collections.singletonList(consumingInstanceId)); - // Delete the OFFLINE table + // Delete the offline table _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); instancePartitionsMap = getInstancePartitionsMap(); assertEquals(instancePartitionsMap.size(), 2); assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING)); assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED)); - // Delete the REALTIME table + // Delete the real-time table _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME); try { getInstancePartitionsMap(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index fec7a73..9a7e73e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -31,7 +31,7 @@ import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.config.TagNameUtils; import org.apache.pinot.common.config.instance.InstanceAssignmentConfig; import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils; -import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig; +import org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig; import org.apache.pinot.common.config.instance.InstanceTagPoolConfig; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; import org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy; @@ -57,9 +57,9 @@ public class InstanceAssignmentTest { new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setServerTenant(TENANT_NAME) .setNumReplicas(numReplicas) .setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build(); - int numInstancesPerReplica = 2; + int numInstancesPerPartition = 2; ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new ReplicaGroupStrategyConfig(); - replicaGroupStrategyConfig.setNumInstancesPerPartition(numInstancesPerReplica); + replicaGroupStrategyConfig.setNumInstancesPerPartition(numInstancesPerPartition); tableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig); InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); int numInstances = 10; @@ -70,9 +70,9 @@ public class InstanceAssignmentTest { instanceConfigs.add(instanceConfig); } - // Instances should be assigned to 3 replicas with a round-robin fashion, each with 2 instances + // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 2 instances InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), numReplicas); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); assertEquals(instancePartitions.getNumPartitions(), 1); // Instances of index 4 to 7 are not assigned because of the hash-based rotation // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8 @@ -92,10 +92,10 @@ public class InstanceAssignmentTest { Collections.singletonMap(partitionColumnName, new ColumnPartitionConfig("Modulo", numPartitions))); tableConfig.getIndexingConfig().setSegmentPartitionConfig(segmentPartitionConfig); - // Instances should be assigned to 3 replicas with a round-robin fashion, each with 3 instances, then these 3 + // Instances should be assigned to 3 replica-groups with a round-robin fashion, each with 3 instances, then these 3 // instances should be assigned to 2 partitions, each with 2 instances instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), numReplicas); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas); assertEquals(instancePartitions.getNumPartitions(), numPartitions); // Instance of index 7 is not assigned because of the hash-based rotation // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8 @@ -147,12 +147,12 @@ public class InstanceAssignmentTest { tagPoolConfig.setPoolBased(true); tagPoolConfig.setNumPools(numPools); assignmentConfig.setTagPoolConfig(tagPoolConfig); - // Assign to 2 replicas so that each replica is assigned to one pool - int numReplicas = numPools; - InstanceReplicaPartitionConfig replicaPartitionConfig = new InstanceReplicaPartitionConfig(); + // Assign to 2 replica-groups so that each replica-group is assigned to one pool + int numReplicaGroups = numPools; + InstanceReplicaGroupPartitionConfig replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(); replicaPartitionConfig.setReplicaGroupBased(true); - replicaPartitionConfig.setNumReplicas(numReplicas); - assignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig); + replicaPartitionConfig.setNumReplicaGroups(numReplicaGroups); + assignmentConfig.setReplicaGroupPartitionConfig(replicaPartitionConfig); TableConfig tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, assignmentConfig)) @@ -160,10 +160,10 @@ public class InstanceAssignmentTest { InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 - // All instances in pool 0 should be assigned to replica 0, and all instances in pool 1 should be assigned to - // replica 1 + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to + // replica-group 1 InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), numReplicas); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, @@ -187,10 +187,10 @@ public class InstanceAssignmentTest { // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // Pool 0 and 2 will be selected in the pool selection - // All instances in pool 0 should be assigned to replica 0, and all instances in pool 2 should be assigned to - // replica 1 + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 2 should be assigned to + // replica-group 1 instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), numReplicas); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, @@ -203,10 +203,10 @@ public class InstanceAssignmentTest { tagPoolConfig.setNumPools(numPools); // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2 - // All instances in pool 2 should be assigned to replica 0, and all instances in pool 0 should be assigned to - // replica 1 + // All instances in pool 2 should be assigned to replica-group 0, and all instances in pool 0 should be assigned to + // replica-group 1 instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), numReplicas); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 12, @@ -219,10 +219,10 @@ public class InstanceAssignmentTest { tagPoolConfig.setPools(Arrays.asList(0, 1)); // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 - // All instances in pool 0 should be assigned to replica 0, and all instances in pool 1 should be assigned to - // replica 1 + // All instances in pool 0 should be assigned to replica-group 0, and all instances in pool 1 should be assigned to + // replica-group 1 instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), numReplicas); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 2, @@ -231,22 +231,22 @@ public class InstanceAssignmentTest { .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9)); - // Assign instances from 2 pools to 3 replicas - numReplicas = numPools; - replicaPartitionConfig.setNumReplicas(numReplicas); + // Assign instances from 2 pools to 3 replica-groups + numReplicaGroups = numPools; + replicaPartitionConfig.setNumReplicaGroups(numReplicaGroups); // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 // [pool0, pool1] // r0 r1 - // r0 - // Each replica should have 2 instances assigned + // r2 + // Each replica-group should have 2 instances assigned // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 // pool 0: [i3, i4, i0, i1, i2] // r0 r2 r0 r2 // pool 1: [i8, i9, i5, i6, i7] // r1 r1 instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), numReplicas); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); @@ -314,20 +314,20 @@ public class InstanceAssignmentTest { instanceConfig.addTag(OFFLINE_TAG); } - // No instance replica/partition config + // No instance replica-group/partition config try { driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); fail(); } catch (IllegalStateException e) { - assertEquals(e.getMessage(), "Instance replica/partition config is missing"); + assertEquals(e.getMessage(), "Instance replica-group/partition config is missing"); } - InstanceReplicaPartitionConfig replicaPartitionConfig = new InstanceReplicaPartitionConfig(); - assignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig); + InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(); + assignmentConfig.setReplicaGroupPartitionConfig(replicaGroupPartitionConfig); - // All instances should be assigned as replica 0 partition 0 + // All instances should be assigned as replica-group 0 partition 0 InstancePartitions instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), 1); + assertEquals(instancePartitions.getNumReplicaGroups(), 1); assertEquals(instancePartitions.getNumPartitions(), 1); List<String> expectedInstances = new ArrayList<>(numInstances); for (int i = 0; i < numInstances; i++) { @@ -356,9 +356,9 @@ public class InstanceAssignmentTest { } // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0 - // All instances in pool 0 should be assigned as replica 0 partition 0 + // All instances in pool 0 should be assigned as replica-group 0 partition 0 instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), 1); + assertEquals(instancePartitions.getNumReplicaGroups(), 1); assertEquals(instancePartitions.getNumPartitions(), 1); expectedInstances.clear(); for (int i = 0; i < numInstances / 2; i++) { @@ -388,7 +388,7 @@ public class InstanceAssignmentTest { } tagPoolConfig.setPools(null); - replicaPartitionConfig.setNumInstances(6); + replicaGroupPartitionConfig.setNumInstances(6); // Ask for too many instances try { @@ -398,32 +398,32 @@ public class InstanceAssignmentTest { assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)"); } - replicaPartitionConfig.setNumInstances(0); + replicaGroupPartitionConfig.setNumInstances(0); // Enable replica-group - replicaPartitionConfig.setReplicaGroupBased(true); + replicaGroupPartitionConfig.setReplicaGroupBased(true); - // Number of replicas must be positive + // Number of replica-groups must be positive try { driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); fail(); } catch (IllegalStateException e) { - assertEquals(e.getMessage(), "Number of replicas must be positive"); + assertEquals(e.getMessage(), "Number of replica-groups must be positive"); } - replicaPartitionConfig.setNumReplicas(11); + replicaGroupPartitionConfig.setNumReplicaGroups(11); - // Ask for too many replicas + // Ask for too many replica-groups try { driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), - "Not enough qualified instances from pool: 0, cannot select 6 replicas from 5 instances"); + "Not enough qualified instances from pool: 0, cannot select 6 replica-groups from 5 instances"); } - replicaPartitionConfig.setNumReplicas(3); - replicaPartitionConfig.setNumInstancesPerReplica(3); + replicaGroupPartitionConfig.setNumReplicaGroups(3); + replicaGroupPartitionConfig.setNumInstancesPerReplicaGroup(3); // Ask for too many instances try { @@ -433,8 +433,8 @@ public class InstanceAssignmentTest { assertEquals(e.getMessage(), "Not enough qualified instances from pool: 0 (5 in the pool, asked for 6)"); } - replicaPartitionConfig.setNumInstancesPerReplica(2); - replicaPartitionConfig.setNumInstancesPerPartition(3); + replicaGroupPartitionConfig.setNumInstancesPerReplicaGroup(2); + replicaGroupPartitionConfig.setNumInstancesPerPartition(3); // Ask for too many instances per partition try { @@ -442,10 +442,10 @@ public class InstanceAssignmentTest { fail(); } catch (IllegalStateException e) { assertEquals(e.getMessage(), - "Number of instances per partition: 3 must be smaller or equal to number of instances per replica: 2"); + "Number of instances per partition: 3 must be smaller or equal to number of instances per replica-group: 2"); } - replicaPartitionConfig.setNumInstancesPerPartition(0); + replicaGroupPartitionConfig.setNumInstancesPerPartition(0); // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3 // pool0: [i3, i4, i0, i1, i2] @@ -453,7 +453,7 @@ public class InstanceAssignmentTest { // pool1: [i8, i9, i5, i6, i7] // r1 r1 instancePartitions = driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs); - assertEquals(instancePartitions.getNumReplicas(), 3); + assertEquals(instancePartitions.getNumReplicaGroups(), 3); assertEquals(instancePartitions.getNumPartitions(), 1); assertEquals(instancePartitions.getInstances(0, 0), Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3)); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java index 089c016..6b26f8e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java @@ -93,14 +93,14 @@ public class OfflineReplicaGroupSegmentAssignmentTest { // } InstancePartitions instancePartitionsWithoutPartition = new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION); - int numInstancesPerReplica = NUM_INSTANCES / NUM_REPLICAS; + int numInstancesPerReplicaGroup = NUM_INSTANCES / NUM_REPLICAS; int instanceIdToAdd = 0; - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - List<String> instancesForReplica = new ArrayList<>(numInstancesPerReplica); - for (int i = 0; i < numInstancesPerReplica; i++) { - instancesForReplica.add(INSTANCES.get(instanceIdToAdd++)); + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { + List<String> instancesForReplicaGroup = new ArrayList<>(numInstancesPerReplicaGroup); + for (int i = 0; i < numInstancesPerReplicaGroup; i++) { + instancesForReplicaGroup.add(INSTANCES.get(instanceIdToAdd++)); } - instancePartitionsWithoutPartition.setInstances(0, replicaId, instancesForReplica); + instancePartitionsWithoutPartition.setInstances(0, replicaGroupId, instancesForReplicaGroup); } _instancePartitionsMapWithoutPartition = Collections.singletonMap(InstancePartitionsType.OFFLINE, instancePartitionsWithoutPartition); @@ -128,13 +128,13 @@ public class OfflineReplicaGroupSegmentAssignmentTest { HelixManager helixManagerWithPartitions = mock(HelixManager.class); when(helixManagerWithPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithPartitions); - ReplicaGroupStrategyConfig strategyConfig = new ReplicaGroupStrategyConfig(); - strategyConfig.setPartitionColumn(PARTITION_COLUMN); + ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new ReplicaGroupStrategyConfig(); + replicaGroupStrategyConfig.setPartitionColumn(PARTITION_COLUMN); TableConfig tableConfigWithPartitions = new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITH_PARTITION) .setNumReplicas(NUM_REPLICAS) .setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build(); - tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(strategyConfig); + tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig); _segmentAssignmentWithPartition = SegmentAssignmentFactory.getSegmentAssignment(helixManagerWithPartitions, tableConfigWithPartitions); @@ -151,15 +151,15 @@ public class OfflineReplicaGroupSegmentAssignmentTest { // } InstancePartitions instancePartitionsWithPartition = new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION); - int numInstancesPerPartition = numInstancesPerReplica / NUM_REPLICAS; + int numInstancesPerPartition = numInstancesPerReplicaGroup / NUM_REPLICAS; instanceIdToAdd = 0; - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) { List<String> instancesForPartition = new ArrayList<>(numInstancesPerPartition); for (int i = 0; i < numInstancesPerPartition; i++) { instancesForPartition.add(INSTANCES.get(instanceIdToAdd++)); } - instancePartitionsWithPartition.setInstances(partitionId, replicaId, instancesForPartition); + instancePartitionsWithPartition.setInstances(partitionId, replicaGroupId, instancesForPartition); } } _instancePartitionsMapWithPartition = @@ -174,27 +174,29 @@ public class OfflineReplicaGroupSegmentAssignmentTest { @Test public void testAssignSegmentWithoutPartition() { - int numInstancesPerReplica = NUM_INSTANCES / NUM_REPLICAS; + int numInstancesPerReplicaGroup = NUM_INSTANCES / NUM_REPLICAS; Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = SEGMENTS.get(segmentId); List<String> instancesAssigned = _segmentAssignmentWithoutPartition .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithoutPartition); assertEquals(instancesAssigned.size(), NUM_REPLICAS); - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - // Segment 0 should be assigned to instance 0, 6, 12 - // Segment 1 should be assigned to instance 1, 7, 13 - // Segment 2 should be assigned to instance 2, 8, 14 - // Segment 3 should be assigned to instance 3, 9, 15 - // Segment 4 should be assigned to instance 4, 10, 16 - // Segment 5 should be assigned to instance 5, 11, 17 - // Segment 6 should be assigned to instance 0, 6, 12 - // Segment 7 should be assigned to instance 1, 7, 13 - // ... - int expectedAssignedInstanceId = segmentId % numInstancesPerReplica + replicaId * numInstancesPerReplica; - assertEquals(instancesAssigned.get(replicaId), INSTANCES.get(expectedAssignedInstanceId)); + // Segment 0 should be assigned to instance 0, 6, 12 + // Segment 1 should be assigned to instance 1, 7, 13 + // Segment 2 should be assigned to instance 2, 8, 14 + // Segment 3 should be assigned to instance 3, 9, 15 + // Segment 4 should be assigned to instance 4, 10, 16 + // Segment 5 should be assigned to instance 5, 11, 17 + // Segment 6 should be assigned to instance 0, 6, 12 + // Segment 7 should be assigned to instance 1, 7, 13 + // ... + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { + int expectedAssignedInstanceId = + segmentId % numInstancesPerReplicaGroup + replicaGroupId * numInstancesPerReplicaGroup; + assertEquals(instancesAssigned.get(replicaGroupId), INSTANCES.get(expectedAssignedInstanceId)); } + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); } @@ -202,31 +204,32 @@ public class OfflineReplicaGroupSegmentAssignmentTest { @Test public void testAssignSegmentWithPartition() { - int numInstancesPerReplica = NUM_INSTANCES / NUM_REPLICAS; + int numInstancesPerReplicaGroup = NUM_INSTANCES / NUM_REPLICAS; Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); - int numInstancesPerPartition = numInstancesPerReplica / NUM_PARTITIONS; + int numInstancesPerPartition = numInstancesPerReplicaGroup / NUM_PARTITIONS; for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = SEGMENTS.get(segmentId); List<String> instancesAssigned = _segmentAssignmentWithPartition .assignSegment(segmentName, currentAssignment, _instancePartitionsMapWithPartition); assertEquals(instancesAssigned.size(), NUM_REPLICAS); - int partitionId = segmentId % NUM_PARTITIONS; - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - // Segment 0 (partition 0) should be assigned to instance 0, 6, 12 - // Segment 1 (partition 1) should be assigned to instance 2, 8, 14 - // Segment 2 (partition 2) should be assigned to instance 4, 10, 16 - // Segment 3 (partition 0) should be assigned to instance 1, 7, 13 - // Segment 4 (partition 1) should be assigned to instance 3, 9, 15 - // Segment 5 (partition 2) should be assigned to instance 5, 11, 17 - // Segment 6 (partition 0) should be assigned to instance 0, 6, 12 - // Segment 7 (partition 1) should be assigned to instance 2, 8, 14 - // ... + // Segment 0 (partition 0) should be assigned to instance 0, 6, 12 + // Segment 1 (partition 1) should be assigned to instance 2, 8, 14 + // Segment 2 (partition 2) should be assigned to instance 4, 10, 16 + // Segment 3 (partition 0) should be assigned to instance 1, 7, 13 + // Segment 4 (partition 1) should be assigned to instance 3, 9, 15 + // Segment 5 (partition 2) should be assigned to instance 5, 11, 17 + // Segment 6 (partition 0) should be assigned to instance 0, 6, 12 + // Segment 7 (partition 1) should be assigned to instance 2, 8, 14 + // ... + int partitionId = segmentId % NUM_PARTITIONS; + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { int expectedAssignedInstanceId = - (segmentId % numInstancesPerReplica) / NUM_PARTITIONS + partitionId * numInstancesPerPartition - + replicaId * numInstancesPerReplica; - assertEquals(instancesAssigned.get(replicaId), INSTANCES.get(expectedAssignedInstanceId)); + (segmentId % numInstancesPerReplicaGroup) / NUM_PARTITIONS + partitionId * numInstancesPerPartition + + replicaGroupId * numInstancesPerReplicaGroup; + assertEquals(instancesAssigned.get(replicaGroupId), INSTANCES.get(expectedAssignedInstanceId)); } + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java index 5fee01a..86bd4ca 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java @@ -105,19 +105,20 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { List<String> instancesAssigned = _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - // Segment 0 (partition 0) should be assigned to instance 0, 1, 2 - // Segment 1 (partition 1) should be assigned to instance 3, 4, 5 - // Segment 2 (partition 2) should be assigned to instance 6, 7, 8 - // Segment 3 (partition 3) should be assigned to instance 0, 1, 2 - // Segment 4 (partition 0) should be assigned to instance 0, 1, 2 - // Segment 5 (partition 1) should be assigned to instance 3, 4, 5 - // ... - int partitionId = segmentId % NUM_PARTITIONS; + // Segment 0 (partition 0) should be assigned to instance 0, 1, 2 + // Segment 1 (partition 1) should be assigned to instance 3, 4, 5 + // Segment 2 (partition 2) should be assigned to instance 6, 7, 8 + // Segment 3 (partition 3) should be assigned to instance 0, 1, 2 + // Segment 4 (partition 0) should be assigned to instance 0, 1, 2 + // Segment 5 (partition 1) should be assigned to instance 3, 4, 5 + // ... + int partitionId = segmentId % NUM_PARTITIONS; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { int expectedAssignedInstanceId = (partitionId * NUM_REPLICAS + replicaId) % NUM_CONSUMING_INSTANCES; assertEquals(instancesAssigned.get(replicaId), CONSUMING_INSTANCES.get(expectedAssignedInstanceId)); } + addToAssignment(currentAssignment, segmentId, instancesAssigned); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java index 04fc344..0fc60df 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java @@ -85,14 +85,14 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { // p0 p1 p2 // p3 InstancePartitions consumingInstancePartitions = new InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME); - int numConsumingInstancesPerReplica = NUM_CONSUMING_INSTANCES / NUM_REPLICAS; + int numConsumingInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS; int consumingInstanceIdToAdd = 0; - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - List<String> consumingInstancesForReplica = new ArrayList<>(numConsumingInstancesPerReplica); - for (int i = 0; i < numConsumingInstancesPerReplica; i++) { - consumingInstancesForReplica.add(CONSUMING_INSTANCES.get(consumingInstanceIdToAdd++)); + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { + List<String> consumingInstancesForReplicaGroup = new ArrayList<>(numConsumingInstancesPerReplicaGroup); + for (int i = 0; i < numConsumingInstancesPerReplicaGroup; i++) { + consumingInstancesForReplicaGroup.add(CONSUMING_INSTANCES.get(consumingInstanceIdToAdd++)); } - consumingInstancePartitions.setInstances(0, replicaId, consumingInstancesForReplica); + consumingInstancePartitions.setInstances(0, replicaGroupId, consumingInstancesForReplicaGroup); } _instancePartitionsMap.put(InstancePartitionsType.CONSUMING, consumingInstancePartitions); @@ -103,14 +103,14 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { // 0_2=[instance_8, instance_9, instance_10, instance_11] // } InstancePartitions completedInstancePartitions = new InstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME); - int numCompletedInstancesPerReplica = NUM_COMPLETED_INSTANCES / NUM_REPLICAS; + int numCompletedInstancesPerReplicaGroup = NUM_COMPLETED_INSTANCES / NUM_REPLICAS; int completedInstanceIdToAdd = 0; - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - List<String> completedInstancesForReplica = new ArrayList<>(numCompletedInstancesPerReplica); - for (int i = 0; i < numCompletedInstancesPerReplica; i++) { - completedInstancesForReplica.add(COMPLETED_INSTANCES.get(completedInstanceIdToAdd++)); + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { + List<String> completedInstancesForReplicaGroup = new ArrayList<>(numCompletedInstancesPerReplicaGroup); + for (int i = 0; i < numCompletedInstancesPerReplicaGroup; i++) { + completedInstancesForReplicaGroup.add(COMPLETED_INSTANCES.get(completedInstanceIdToAdd++)); } - completedInstancePartitions.setInstances(0, replicaId, completedInstancesForReplica); + completedInstancePartitions.setInstances(0, replicaGroupId, completedInstancesForReplicaGroup); } _instancePartitionsMap.put(InstancePartitionsType.COMPLETED, completedInstancePartitions); } @@ -122,26 +122,28 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { @Test public void testAssignSegment() { - int numInstancesPerReplica = NUM_CONSUMING_INSTANCES / NUM_REPLICAS; + int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS; Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { String segmentName = _segments.get(segmentId); List<String> instancesAssigned = _segmentAssignment.assignSegment(segmentName, currentAssignment, _instancePartitionsMap); assertEquals(instancesAssigned.size(), NUM_REPLICAS); - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - - // Segment 0 (partition 0) should be assigned to instance 0, 3, 6 - // Segment 1 (partition 1) should be assigned to instance 1, 4, 7 - // Segment 2 (partition 2) should be assigned to instance 2, 5, 8 - // Segment 3 (partition 3) should be assigned to instance 0, 3, 6 - // Segment 4 (partition 0) should be assigned to instance 0, 3, 6 - // Segment 5 (partition 1) should be assigned to instance 1, 4, 7 - // ... + + // Segment 0 (partition 0) should be assigned to instance 0, 3, 6 + // Segment 1 (partition 1) should be assigned to instance 1, 4, 7 + // Segment 2 (partition 2) should be assigned to instance 2, 5, 8 + // Segment 3 (partition 3) should be assigned to instance 0, 3, 6 + // Segment 4 (partition 0) should be assigned to instance 0, 3, 6 + // Segment 5 (partition 1) should be assigned to instance 1, 4, 7 + // ... + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { int partitionId = segmentId % NUM_PARTITIONS; - int expectedAssignedInstanceId = partitionId % numInstancesPerReplica + replicaId * numInstancesPerReplica; - assertEquals(instancesAssigned.get(replicaId), CONSUMING_INSTANCES.get(expectedAssignedInstanceId)); + int expectedAssignedInstanceId = + partitionId % numInstancesPerReplicaGroup + replicaGroupId * numInstancesPerReplicaGroup; + assertEquals(instancesAssigned.get(replicaGroupId), CONSUMING_INSTANCES.get(expectedAssignedInstanceId)); } + addToAssignment(currentAssignment, segmentId, instancesAssigned); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java index 86cfac6..ef746e8 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java @@ -217,28 +217,28 @@ public class SegmentAssignmentUtilsTest { // 0_2=[instance_6, instance_7, instance_8] // } InstancePartitions instancePartitions = new InstancePartitions(null); - int numInstancesPerReplica = numInstances / NUM_REPLICAS; + int numInstancesPerReplicaGroup = numInstances / NUM_REPLICAS; int instanceIdToAdd = 0; - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - List<String> instancesForReplica = new ArrayList<>(numInstancesPerReplica); - for (int i = 0; i < numInstancesPerReplica; i++) { - instancesForReplica.add(instances.get(instanceIdToAdd++)); + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { + List<String> instancesForReplicaGroup = new ArrayList<>(numInstancesPerReplicaGroup); + for (int i = 0; i < numInstancesPerReplicaGroup; i++) { + instancesForReplicaGroup.add(instances.get(instanceIdToAdd++)); } - instancePartitions.setInstances(0, replicaId, instancesForReplica); + instancePartitions.setInstances(0, replicaGroupId, instancesForReplicaGroup); } // Uniformly spray segments to the instances: - // Replica group 0: [instance_0, instance_1, instance_2], - // Replica group 1: [instance_3, instance_4, instance_5], - // Replica group 2: [instance_6, instance_7, instance_8] + // Replica-group 0: [instance_0, instance_1, instance_2], + // Replica-group 1: [instance_3, instance_4, instance_5], + // Replica-group 2: [instance_6, instance_7, instance_8] // segment_0 segment_1 segment_2 // segment_3 segment_4 segment_5 // ... Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); for (int segmentId = 0; segmentId < numSegments; segmentId++) { List<String> instancesAssigned = new ArrayList<>(NUM_REPLICAS); - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - int assignedInstanceId = segmentId % numInstancesPerReplica + replicaId * numInstancesPerReplica; + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { + int assignedInstanceId = segmentId % numInstancesPerReplicaGroup + replicaGroupId * numInstancesPerReplicaGroup; instancesAssigned.add(instances.get(assignedInstanceId)); } currentAssignment.put(segments.get(segmentId), @@ -270,20 +270,20 @@ public class SegmentAssignmentUtilsTest { // 0_2=[instance_6, instance_7, instance_8] // } List<String> newInstances = new ArrayList<>(numInstances); - List<String> newReplica0Instances = new ArrayList<>(instancePartitions.getInstances(0, 0)); - String newReplica0Instance = INSTANCE_NAME_PREFIX + 9; - newReplica0Instances.set(0, newReplica0Instance); - newInstances.addAll(newReplica0Instances); - List<String> newReplica1Instances = new ArrayList<>(instancePartitions.getInstances(0, 1)); - String newReplica1Instance = INSTANCE_NAME_PREFIX + 10; - newReplica1Instances.set(1, newReplica1Instance); - newInstances.addAll(newReplica1Instances); - List<String> newReplica2Instances = instancePartitions.getInstances(0, 2); - newInstances.addAll(newReplica2Instances); + List<String> newReplicaGroup0Instances = new ArrayList<>(instancePartitions.getInstances(0, 0)); + String newReplicaGroup0Instance = INSTANCE_NAME_PREFIX + 9; + newReplicaGroup0Instances.set(0, newReplicaGroup0Instance); + newInstances.addAll(newReplicaGroup0Instances); + List<String> newReplicaGroup1Instances = new ArrayList<>(instancePartitions.getInstances(0, 1)); + String newReplicaGroup1Instance = INSTANCE_NAME_PREFIX + 10; + newReplicaGroup1Instances.set(1, newReplicaGroup1Instance); + newInstances.addAll(newReplicaGroup1Instances); + List<String> newReplicaGroup2Instances = instancePartitions.getInstances(0, 2); + newInstances.addAll(newReplicaGroup2Instances); InstancePartitions newInstancePartitions = new InstancePartitions(null); - newInstancePartitions.setInstances(0, 0, newReplica0Instances); - newInstancePartitions.setInstances(0, 1, newReplica1Instances); - newInstancePartitions.setInstances(0, 2, newReplica2Instances); + newInstancePartitions.setInstances(0, 0, newReplicaGroup0Instances); + newInstancePartitions.setInstances(0, 1, newReplicaGroup1Instances); + newInstancePartitions.setInstances(0, 2, newReplicaGroup2Instances); Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils .rebalanceReplicaGroupBasedTable(currentAssignment, newInstancePartitions, partitionIdToSegmentsMap); // There should be 90 segments assigned @@ -301,34 +301,34 @@ public class SegmentAssignmentUtilsTest { Map<String, Integer> numSegmentsToBeMovedPerInstance = SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment); assertEquals(numSegmentsToBeMovedPerInstance.size(), 2); - assertEquals((int) numSegmentsToBeMovedPerInstance.get(newReplica0Instance), numSegmentsPerInstance); - assertEquals((int) numSegmentsToBeMovedPerInstance.get(newReplica1Instance), numSegmentsPerInstance); - String replica0OldInstanceName = INSTANCE_NAME_PREFIX + 0; - String replica1OldInstanceName = INSTANCE_NAME_PREFIX + 4; + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newReplicaGroup0Instance), numSegmentsPerInstance); + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newReplicaGroup1Instance), numSegmentsPerInstance); + String oldReplicaGroup0Instance = INSTANCE_NAME_PREFIX + 0; + String oldReplicaGroup1Instance = INSTANCE_NAME_PREFIX + 4; for (String segmentName : segments) { Map<String, String> oldInstanceStateMap = currentAssignment.get(segmentName); - if (oldInstanceStateMap.containsKey(replica0OldInstanceName)) { - assertTrue(newAssignment.get(segmentName).containsKey(newReplica0Instance)); + if (oldInstanceStateMap.containsKey(oldReplicaGroup0Instance)) { + assertTrue(newAssignment.get(segmentName).containsKey(newReplicaGroup0Instance)); } - if (oldInstanceStateMap.containsKey(replica1OldInstanceName)) { - assertTrue(newAssignment.get(segmentName).containsKey(newReplica1Instance)); + if (oldInstanceStateMap.containsKey(oldReplicaGroup1Instance)) { + assertTrue(newAssignment.get(segmentName).containsKey(newReplicaGroup1Instance)); } } - // Remove 3 instances (1 from each replica) + // Remove 3 instances (1 from each replica-group) // { // 0_0=[instance_0, instance_1], // 0_1=[instance_3, instance_4], // 0_2=[instance_6, instance_7] // } int newNumInstances = numInstances - 3; - int newNumInstancesPerReplica = newNumInstances / NUM_REPLICAS; + int newNumInstancesPerReplicaGroup = newNumInstances / NUM_REPLICAS; newInstances = new ArrayList<>(newNumInstances); - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - List<String> newInstancesForReplica = - instancePartitions.getInstances(0, replicaId).subList(0, newNumInstancesPerReplica); - newInstancePartitions.setInstances(0, replicaId, newInstancesForReplica); - newInstances.addAll(newInstancesForReplica); + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { + List<String> newInstancesForReplicaGroup = + instancePartitions.getInstances(0, replicaGroupId).subList(0, newNumInstancesPerReplicaGroup); + newInstancePartitions.setInstances(0, replicaGroupId, newInstancesForReplicaGroup); + newInstances.addAll(newInstancesForReplicaGroup); } newAssignment = SegmentAssignmentUtils .rebalanceReplicaGroupBasedTable(currentAssignment, newInstancePartitions, partitionIdToSegmentsMap); @@ -354,22 +354,22 @@ public class SegmentAssignmentUtilsTest { newNumSegmentsPerInstance - numSegmentsPerInstance); } - // Add 6 instances (2 to each replica) + // Add 6 instances (2 to each replica-group) // { // 0_0=[instance_0, instance_1, instance_2, instance_9, instance_10], // 0_1=[instance_3, instance_4, instance_5, instance_11, instance_12], // 0_2=[instance_6, instance_7, instance_8, instance_13, instance_14] // } newNumInstances = numInstances + 6; - newNumInstancesPerReplica = newNumInstances / NUM_REPLICAS; + newNumInstancesPerReplicaGroup = newNumInstances / NUM_REPLICAS; newInstances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances); instanceIdToAdd = numInstances; - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - List<String> newInstancesForReplica = new ArrayList<>(instancePartitions.getInstances(0, replicaId)); - for (int i = 0; i < newNumInstancesPerReplica - numInstancesPerReplica; i++) { - newInstancesForReplica.add(newInstances.get(instanceIdToAdd++)); + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { + List<String> newInstancesForReplicaGroup = new ArrayList<>(instancePartitions.getInstances(0, replicaGroupId)); + for (int i = 0; i < newNumInstancesPerReplicaGroup - numInstancesPerReplicaGroup; i++) { + newInstancesForReplicaGroup.add(newInstances.get(instanceIdToAdd++)); } - newInstancePartitions.setInstances(0, replicaId, newInstancesForReplica); + newInstancePartitions.setInstances(0, replicaGroupId, newInstancesForReplicaGroup); } newAssignment = SegmentAssignmentUtils .rebalanceReplicaGroupBasedTable(currentAssignment, newInstancePartitions, partitionIdToSegmentsMap); @@ -402,12 +402,12 @@ public class SegmentAssignmentUtilsTest { // } newInstances = SegmentAssignmentTestUtils.getNameList("i_", numInstances); instanceIdToAdd = 0; - for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { - List<String> instancesForReplica = new ArrayList<>(numInstancesPerReplica); - for (int i = 0; i < numInstancesPerReplica; i++) { - instancesForReplica.add(newInstances.get(instanceIdToAdd++)); + for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; replicaGroupId++) { + List<String> instancesForReplicaGroup = new ArrayList<>(numInstancesPerReplicaGroup); + for (int i = 0; i < numInstancesPerReplicaGroup; i++) { + instancesForReplicaGroup.add(newInstances.get(instanceIdToAdd++)); } - newInstancePartitions.setInstances(0, replicaId, instancesForReplica); + newInstancePartitions.setInstances(0, replicaGroupId, instancesForReplicaGroup); } newAssignment = SegmentAssignmentUtils .rebalanceReplicaGroupBasedTable(currentAssignment, newInstancePartitions, partitionIdToSegmentsMap); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org