This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch rename_replica_group
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 46bb4585b274b548b353441f845595c3d83d9e74
Author: Jackie (Xiaotian) Jiang <xaji...@linkedin.com>
AuthorDate: Mon Aug 19 14:26:54 2019 -0700

    [Instance Assignment] Rename instance level replica to replica-group
    
    To reduce the confusion of the code:
    - Replica-group: a set of instances that serves one replica of all the 
segments
    - Replica: one copy of the segment
---
 .../config/instance/InstanceAssignmentConfig.java  |  14 +--
 .../instance/InstanceAssignmentConfigUtils.java    |  16 +--
 ...va => InstanceReplicaGroupPartitionConfig.java} |  32 +++---
 .../pinot/common/config/TableConfigTest.java       |  27 ++---
 .../helix/core/assignment/InstancePartitions.java  |  33 +++---
 .../instance/InstanceAssignmentDriver.java         |  12 +-
 ... => InstanceReplicaGroupPartitionSelector.java} | 127 +++++++++++----------
 .../segment/OfflineSegmentAssignment.java          |  58 +++++-----
 .../segment/RealtimeSegmentAssignment.java         |  39 ++++---
 .../assignment/segment/SegmentAssignmentUtils.java |  25 ++--
 ...PinotInstanceAssignmentRestletResourceTest.java |  62 +++++-----
 .../instance/InstanceAssignmentTest.java           | 106 ++++++++---------
 .../OfflineReplicaGroupSegmentAssignmentTest.java  |  85 +++++++-------
 ...altimeNonReplicaGroupSegmentAssignmentTest.java |  19 +--
 .../RealtimeReplicaGroupSegmentAssignmentTest.java |  50 ++++----
 .../segment/SegmentAssignmentUtilsTest.java        | 102 ++++++++---------
 16 files changed, 414 insertions(+), 393 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfig.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfig.java
index 9aa5a9c..6d08f21 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfig.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfig.java
@@ -38,10 +38,10 @@ public class InstanceAssignmentConfig {
   @NestedConfig
   private InstanceConstraintConfig _constraintConfig;
 
-  @ConfigKey("replicaPartitionConfig")
-  @ConfigDoc(value = "Configuration for the instance replica and partition of 
the instance assignment", mandatory = true)
+  @ConfigKey("replicaGroupPartitionConfig")
+  @ConfigDoc(value = "Configuration for the instance replica-group and 
partition of the instance assignment", mandatory = true)
   @NestedConfig
-  private InstanceReplicaPartitionConfig _replicaPartitionConfig;
+  private InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
 
   @JsonProperty
   public InstanceTagPoolConfig getTagPoolConfig() {
@@ -64,12 +64,12 @@ public class InstanceAssignmentConfig {
   }
 
   @JsonProperty
-  public InstanceReplicaPartitionConfig getReplicaPartitionConfig() {
-    return _replicaPartitionConfig;
+  public InstanceReplicaGroupPartitionConfig getReplicaGroupPartitionConfig() {
+    return _replicaGroupPartitionConfig;
   }
 
   @JsonProperty
-  public void setReplicaPartitionConfig(InstanceReplicaPartitionConfig 
replicaPartitionConfig) {
-    _replicaPartitionConfig = replicaPartitionConfig;
+  public void 
setReplicaGroupPartitionConfig(InstanceReplicaGroupPartitionConfig 
replicaGroupPartitionConfig) {
+    _replicaGroupPartitionConfig = replicaGroupPartitionConfig;
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java
index 446f107..91c960c 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceAssignmentConfigUtils.java
@@ -42,14 +42,14 @@ public class InstanceAssignmentConfigUtils {
     Map<InstancePartitionsType, InstanceAssignmentConfig> 
instanceAssignmentConfigMap =
         tableConfig.getInstanceAssignmentConfigMap();
     switch (instancePartitionsType) {
-      // Allow OFFLINE instance assignment if the OFFLINE table has it 
configured or (for backward-compatibility) is
+      // Allow OFFLINE instance assignment if the offline table has it 
configured or (for backward-compatibility) is
       // using replica-group segment assignment
       case OFFLINE:
         return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap 
!= null && instanceAssignmentConfigMap
             .containsKey(InstancePartitionsType.OFFLINE))
             || AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
             
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()));
-      // Allow CONSUMING/COMPLETED instance assignment if the REALTIME table 
has it configured
+      // Allow CONSUMING/COMPLETED instance assignment if the real-time table 
has it configured
       case CONSUMING:
       case COMPLETED:
         return tableType == TableType.REALTIME && (instanceAssignmentConfigMap 
!= null && instanceAssignmentConfigMap
@@ -77,17 +77,17 @@ public class InstanceAssignmentConfigUtils {
     }
 
     // Generate default instance assignment config if it does not exist
-    // Only allow default config for OFFLINE table with replica-group segment 
assignment for backward-compatibility
+    // Only allow default config for offline table with replica-group segment 
assignment for backward-compatibility
     InstanceAssignmentConfig instanceAssignmentConfig = new 
InstanceAssignmentConfig();
 
     InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig();
     
tagPoolConfig.setTag(TagNameUtils.getOfflineTagForTenant(tableConfig.getTenantConfig().getServer()));
     instanceAssignmentConfig.setTagPoolConfig(tagPoolConfig);
 
-    InstanceReplicaPartitionConfig replicaPartitionConfig = new 
InstanceReplicaPartitionConfig();
+    InstanceReplicaGroupPartitionConfig replicaPartitionConfig = new 
InstanceReplicaGroupPartitionConfig();
     replicaPartitionConfig.setReplicaGroupBased(true);
     SegmentsValidationAndRetentionConfig segmentConfig = 
tableConfig.getValidationConfig();
-    
replicaPartitionConfig.setNumReplicas(segmentConfig.getReplicationNumber());
+    
replicaPartitionConfig.setNumReplicaGroups(segmentConfig.getReplicationNumber());
     ReplicaGroupStrategyConfig replicaGroupStrategyConfig = 
segmentConfig.getReplicaGroupStrategyConfig();
     Preconditions.checkState(replicaGroupStrategyConfig != null, "Failed to 
find the replica-group strategy config");
     String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn();
@@ -99,10 +99,10 @@ public class InstanceAssignmentConfigUtils {
       
replicaPartitionConfig.setNumInstancesPerPartition(replicaGroupStrategyConfig.getNumInstancesPerPartition());
     } else {
       // If partition column is not configured, use 
replicaGroupStrategyConfig.getNumInstancesPerPartition() as
-      // number of instances per replica for backward-compatibility
-      
replicaPartitionConfig.setNumInstancesPerReplica(replicaGroupStrategyConfig.getNumInstancesPerPartition());
+      // number of instances per replica-group for backward-compatibility
+      
replicaPartitionConfig.setNumInstancesPerReplicaGroup(replicaGroupStrategyConfig.getNumInstancesPerPartition());
     }
-    instanceAssignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig);
+    
instanceAssignmentConfig.setReplicaGroupPartitionConfig(replicaPartitionConfig);
 
     return instanceAssignmentConfig;
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaPartitionConfig.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaGroupPartitionConfig.java
similarity index 75%
rename from 
pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaPartitionConfig.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaGroupPartitionConfig.java
index 65d1e49..dadfd8a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaPartitionConfig.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/instance/InstanceReplicaGroupPartitionConfig.java
@@ -25,7 +25,7 @@ import org.apache.pinot.common.config.ConfigKey;
 
 
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class InstanceReplicaPartitionConfig {
+public class InstanceReplicaGroupPartitionConfig {
 
   @ConfigKey("replicaGroupBased")
   @ConfigDoc("Whether to use replica-group based selection, false by default")
@@ -35,20 +35,20 @@ public class InstanceReplicaPartitionConfig {
   @ConfigDoc("Number of instances to select for non-replica-group based 
selection, select all instances if not specified")
   private int _numInstances;
 
-  @ConfigKey("numReplicas")
-  @ConfigDoc("Number of replicas (replica-groups) for replica-group based 
selection")
-  private int _numReplicas;
+  @ConfigKey("numReplicaGroups")
+  @ConfigDoc("Number of replica-groups for replica-group based selection")
+  private int _numReplicaGroups;
 
-  @ConfigKey("numServersPerReplica")
-  @ConfigDoc("Number of instances per replica for replica-group based 
selection, select as many instances as possible if not specified")
-  private int _numInstancesPerReplica;
+  @ConfigKey("numServersPerReplicaGroup")
+  @ConfigDoc("Number of instances per replica-group for replica-group based 
selection, select as many instances as possible if not specified")
+  private int _numInstancesPerReplicaGroup;
 
   @ConfigKey("numPartitions")
   @ConfigDoc("Number of partitions for replica-group based selection, do not 
partition the replica-group (1 partition) if not specified")
   private int _numPartitions;
 
   @ConfigKey("numServersPerPartition")
-  @ConfigDoc("Number of instances per partition (within a replica) for 
replica-group based selection, select all instances if not specified")
+  @ConfigDoc("Number of instances per partition (within a replica-group) for 
replica-group based selection, select all instances if not specified")
   private int _numInstancesPerPartition;
 
   @JsonProperty
@@ -72,23 +72,23 @@ public class InstanceReplicaPartitionConfig {
   }
 
   @JsonProperty
-  public int getNumReplicas() {
-    return _numReplicas;
+  public int getNumReplicaGroups() {
+    return _numReplicaGroups;
   }
 
   @JsonProperty
-  public void setNumReplicas(int numReplicas) {
-    _numReplicas = numReplicas;
+  public void setNumReplicaGroups(int numReplicaGroups) {
+    _numReplicaGroups = numReplicaGroups;
   }
 
   @JsonProperty
-  public int getNumInstancesPerReplica() {
-    return _numInstancesPerReplica;
+  public int getNumInstancesPerReplicaGroup() {
+    return _numInstancesPerReplicaGroup;
   }
 
   @JsonProperty
-  public void setNumInstancesPerReplica(int numInstancesPerReplica) {
-    _numInstancesPerReplica = numInstancesPerReplica;
+  public void setNumInstancesPerReplicaGroup(int numInstancesPerReplicaGroup) {
+    _numInstancesPerReplicaGroup = numInstancesPerReplicaGroup;
   }
 
   @JsonProperty
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
index b024d16..2675de8 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfig;
 import org.apache.pinot.common.config.instance.InstanceConstraintConfig;
-import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig;
+import 
org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
@@ -358,11 +358,11 @@ public class TableConfigTest {
       constraintConfig.setConstraints(Arrays.asList("constraint1", 
"constraint2"));
       instanceAssignmentConfig.setConstraintConfig(constraintConfig);
 
-      InstanceReplicaPartitionConfig replicaPartitionConfig = new 
InstanceReplicaPartitionConfig();
-      replicaPartitionConfig.setReplicaGroupBased(true);
-      replicaPartitionConfig.setNumReplicas(3);
-      replicaPartitionConfig.setNumInstancesPerReplica(5);
-      
instanceAssignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig);
+      InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig();
+      replicaGroupPartitionConfig.setReplicaGroupBased(true);
+      replicaGroupPartitionConfig.setNumReplicaGroups(3);
+      replicaGroupPartitionConfig.setNumInstancesPerReplicaGroup(5);
+      
instanceAssignmentConfig.setReplicaGroupPartitionConfig(replicaGroupPartitionConfig);
 
       TableConfig tableConfig = 
tableConfigBuilder.setInstanceAssignmentConfigMap(
           Collections.singletonMap(InstancePartitionsType.OFFLINE, 
instanceAssignmentConfig)).build();
@@ -460,12 +460,13 @@ public class TableConfigTest {
     InstanceConstraintConfig constraintConfig = 
instanceAssignmentConfig.getConstraintConfig();
     assertEquals(constraintConfig.getConstraints(), 
Arrays.asList("constraint1", "constraint2"));
 
-    InstanceReplicaPartitionConfig replicaPartitionConfig = 
instanceAssignmentConfig.getReplicaPartitionConfig();
-    assertTrue(replicaPartitionConfig.isReplicaGroupBased());
-    assertEquals(replicaPartitionConfig.getNumInstances(), 0);
-    assertEquals(replicaPartitionConfig.getNumReplicas(), 3);
-    assertEquals(replicaPartitionConfig.getNumInstancesPerReplica(), 5);
-    assertEquals(replicaPartitionConfig.getNumPartitions(), 0);
-    assertEquals(replicaPartitionConfig.getNumInstancesPerPartition(), 0);
+    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+        instanceAssignmentConfig.getReplicaGroupPartitionConfig();
+    assertTrue(replicaGroupPartitionConfig.isReplicaGroupBased());
+    assertEquals(replicaGroupPartitionConfig.getNumInstances(), 0);
+    assertEquals(replicaGroupPartitionConfig.getNumReplicaGroups(), 3);
+    assertEquals(replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(), 
5);
+    assertEquals(replicaGroupPartitionConfig.getNumPartitions(), 0);
+    assertEquals(replicaGroupPartitionConfig.getNumInstancesPerPartition(), 0);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
index 4060f4b..53c48d4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
@@ -33,19 +33,21 @@ import org.apache.pinot.common.utils.JsonUtils;
 /**
  * Instance (server) partitions for the table.
  *
- * <p>The instance partitions is stored as a map from partition of the format: 
{@code <partitionId>_<replicaId>} to
+ * <p>The instance partitions is stored as a map from partition of the format: 
{@code <partitionId>_<replicaGroupId>} to
  * list of server instances, and is persisted under the ZK path: {@code 
<cluster>/PROPERTYSTORE/INSTANCE_PARTITIONS}.
+ * <p>When partition is not enabled, all instances will be stored as partition 
0.
+ * <p>When replica-group is not enabled, all instances will be stored as 
replica-group 0.
  * <p>The segment assignment will be based on the instance partitions of the 
table.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class InstancePartitions {
-  private static final char PARTITION_REPLICA_SEPARATOR = '_';
+  private static final char PARTITION_REPLICA_GROUP_SEPARATOR = '_';
 
   // Name will be of the format "<rawTableName>_<type>", e.g. "table_OFFLINE", 
"table_CONSUMING", "table_COMPLETED"
   private final String _name;
   private final Map<String, List<String>> _partitionToInstancesMap;
   private int _numPartitions;
-  private int _numReplicas;
+  private int _numReplicaGroups;
 
   public InstancePartitions(String name) {
     _name = name;
@@ -58,11 +60,11 @@ public class InstancePartitions {
     _name = name;
     _partitionToInstancesMap = partitionToInstancesMap;
     for (String key : partitionToInstancesMap.keySet()) {
-      int splitterIndex = key.indexOf(PARTITION_REPLICA_SEPARATOR);
-      int partition = Integer.parseInt(key.substring(0, splitterIndex));
-      int replica = Integer.parseInt(key.substring(splitterIndex + 1));
-      _numPartitions = Integer.max(_numPartitions, partition + 1);
-      _numReplicas = Integer.max(_numReplicas, replica + 1);
+      int separatorIndex = key.indexOf(PARTITION_REPLICA_GROUP_SEPARATOR);
+      int partitionId = Integer.parseInt(key.substring(0, separatorIndex));
+      int replicaGroupId = Integer.parseInt(key.substring(separatorIndex + 1));
+      _numPartitions = Integer.max(_numPartitions, partitionId + 1);
+      _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1);
     }
   }
 
@@ -82,19 +84,20 @@ public class InstancePartitions {
   }
 
   @JsonIgnore
-  public int getNumReplicas() {
-    return _numReplicas;
+  public int getNumReplicaGroups() {
+    return _numReplicaGroups;
   }
 
-  public List<String> getInstances(int partitionId, int replicaId) {
-    return _partitionToInstancesMap.get(Integer.toString(partitionId) + 
PARTITION_REPLICA_SEPARATOR + replicaId);
+  public List<String> getInstances(int partitionId, int replicaGroupId) {
+    return _partitionToInstancesMap
+        .get(Integer.toString(partitionId) + PARTITION_REPLICA_GROUP_SEPARATOR 
+ replicaGroupId);
   }
 
-  public void setInstances(int partitionId, int replicaId, List<String> 
instances) {
-    String key = Integer.toString(partitionId) + PARTITION_REPLICA_SEPARATOR + 
replicaId;
+  public void setInstances(int partitionId, int replicaGroupId, List<String> 
instances) {
+    String key = Integer.toString(partitionId) + 
PARTITION_REPLICA_GROUP_SEPARATOR + replicaGroupId;
     _partitionToInstancesMap.put(key, instances);
     _numPartitions = Integer.max(_numPartitions, partitionId + 1);
-    _numReplicas = Integer.max(_numReplicas, replicaId + 1);
+    _numReplicaGroups = Integer.max(_numReplicaGroups, replicaGroupId + 1);
   }
 
   public static InstancePartitions fromZNRecord(ZNRecord znRecord) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index a715834..b87b123 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -28,7 +28,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfig;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.config.instance.InstanceConstraintConfig;
-import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig;
+import 
org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
 import org.apache.pinot.common.utils.InstancePartitionsType;
 import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
  * <ul>
  *   <li>Select instances based on the tag/pool configuration</li>
  *   <li>Apply constraints to the instances (optional, multiple constraints 
can be chained up)</li>
- *   <li>Select instances based on the replica/partition configuration</li>
+ *   <li>Select instances based on the replica-group/partition 
configuration</li>
  * </ul>
  */
 public class InstanceAssignmentDriver {
@@ -77,10 +77,10 @@ public class InstanceAssignmentDriver {
       poolToInstanceConfigsMap = 
constraintApplier.applyConstraint(poolToInstanceConfigsMap);
     }
 
-    InstanceReplicaPartitionConfig replicaPartitionConfig = 
assignmentConfig.getReplicaPartitionConfig();
-    Preconditions.checkState(replicaPartitionConfig != null, "Instance 
replica/partition config is missing");
-    InstanceReplicaPartitionSelector replicaPartitionSelector =
-        new InstanceReplicaPartitionSelector(replicaPartitionConfig, 
tableNameWithType);
+    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = 
assignmentConfig.getReplicaGroupPartitionConfig();
+    Preconditions.checkState(replicaGroupPartitionConfig != null, "Instance 
replica-group/partition config is missing");
+    InstanceReplicaGroupPartitionSelector replicaPartitionSelector =
+        new InstanceReplicaGroupPartitionSelector(replicaGroupPartitionConfig, 
tableNameWithType);
     InstancePartitions instancePartitions = new InstancePartitions(
         
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
     replicaPartitionSelector.selectInstances(poolToInstanceConfigsMap, 
instancePartitions);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
similarity index 52%
rename from 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java
rename to 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index 90c35fb..9102d4a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaPartitionSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -24,31 +24,30 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.helix.model.InstanceConfig;
-import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig;
+import 
org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The instance replica/partition selector is responsible for selecting the 
instances for each replica and partition.
- * <p>NOTE: The replica here refers to the table level replica 
(replica-group), which is a set of instances that contain
- * all segments for a table.
+ * The instance replica-group/partition selector is responsible for selecting 
the instances for each replica-group and
+ * partition.
  */
-public class InstanceReplicaPartitionSelector {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceReplicaPartitionSelector.class);
+public class InstanceReplicaGroupPartitionSelector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class);
 
-  private final InstanceReplicaPartitionConfig _replicaPartitionConfig;
+  private final InstanceReplicaGroupPartitionConfig 
_replicaGroupPartitionConfig;
   private final String _tableNameWithType;
 
-  public InstanceReplicaPartitionSelector(InstanceReplicaPartitionConfig 
replicaPartitionConfig,
+  public 
InstanceReplicaGroupPartitionSelector(InstanceReplicaGroupPartitionConfig 
replicaGroupPartitionConfig,
       String tableNameWithType) {
-    _replicaPartitionConfig = replicaPartitionConfig;
+    _replicaGroupPartitionConfig = replicaGroupPartitionConfig;
     _tableNameWithType = tableNameWithType;
   }
 
   /**
-   * Selects instances based on the replica and partition config, and stores 
the result into the given instance
+   * Selects instances based on the replica-group/partition config, and stores 
the result into the given instance
    * partitions.
    */
   public void selectInstances(Map<Integer, List<InstanceConfig>> 
poolToInstanceConfigsMap,
@@ -59,99 +58,107 @@ public class InstanceReplicaPartitionSelector {
     int tableNameHash = Math.abs(_tableNameWithType.hashCode());
     List<Integer> pools = new ArrayList<>(poolToInstanceConfigsMap.keySet());
     pools.sort(null);
-    LOGGER.info("Starting instance replica/partition selection for table: {} 
with hash: {} from pools: {}",
+    LOGGER.info("Starting instance replica-group/partition selection for 
table: {} with hash: {} from pools: {}",
         _tableNameWithType, tableNameHash, pools);
 
-    if (_replicaPartitionConfig.isReplicaGroupBased()) {
+    if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
       // Replica-group based selection
 
-      int numReplicas = _replicaPartitionConfig.getNumReplicas();
-      Preconditions.checkState(numReplicas > 0, "Number of replicas must be 
positive");
-      Map<Integer, List<Integer>> poolToReplicaIdsMap = new TreeMap<>();
-      for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
-        // Pick one pool for each replica based on the table name hash
+      int numReplicaGroups = 
_replicaGroupPartitionConfig.getNumReplicaGroups();
+      Preconditions.checkState(numReplicaGroups > 0, "Number of replica-groups 
must be positive");
+      Map<Integer, List<Integer>> poolToReplicaGroupIdsMap = new TreeMap<>();
+      for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) {
+        // Pick one pool for each replica-group based on the table name hash
         int pool = pools.get((tableNameHash + replicaId) % numPools);
-        poolToReplicaIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
+        poolToReplicaGroupIdsMap.computeIfAbsent(pool, k -> new 
ArrayList<>()).add(replicaId);
       }
-      LOGGER.info("Selecting {} replicas from pool: {} for table: {}", 
numReplicas, poolToReplicaIdsMap,
+      LOGGER.info("Selecting {} replica-groups from pool: {} for table: {}", 
numReplicaGroups, poolToReplicaGroupIdsMap,
           _tableNameWithType);
 
-      int numInstancesPerReplica = 
_replicaPartitionConfig.getNumInstancesPerReplica();
-      if (numInstancesPerReplica > 0) {
-        // Check if we have enough instances if number of instances per 
replica is configured
-        for (Map.Entry<Integer, List<Integer>> entry : 
poolToReplicaIdsMap.entrySet()) {
+      int numInstancesPerReplicaGroup = 
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+      if (numInstancesPerReplicaGroup > 0) {
+        // Check if we have enough instances if number of instances per 
replica-group is configured
+        for (Map.Entry<Integer, List<Integer>> entry : 
poolToReplicaGroupIdsMap.entrySet()) {
           int pool = entry.getKey();
           int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
-          int numInstancesToSelect = numInstancesPerReplica * 
entry.getValue().size();
+          int numInstancesToSelect = numInstancesPerReplicaGroup * 
entry.getValue().size();
           Preconditions.checkState(numInstancesToSelect <= numInstancesInPool,
               "Not enough qualified instances from pool: %s (%s in the pool, 
asked for %s)", pool, numInstancesInPool,
               numInstancesToSelect);
         }
       } else {
-        // Use as many instances as possible if number of instances per 
replica is not configured
-        numInstancesPerReplica = Integer.MAX_VALUE;
-        for (Map.Entry<Integer, List<Integer>> entry : 
poolToReplicaIdsMap.entrySet()) {
+        // Use as many instances as possible if number of instances per 
replica-group is not configured
+        numInstancesPerReplicaGroup = Integer.MAX_VALUE;
+        for (Map.Entry<Integer, List<Integer>> entry : 
poolToReplicaGroupIdsMap.entrySet()) {
           int pool = entry.getKey();
-          int numReplicasInPool = entry.getValue().size();
+          int numReplicaGroupsInPool = entry.getValue().size();
           int numInstancesInPool = poolToInstanceConfigsMap.get(pool).size();
-          Preconditions.checkState(numReplicasInPool <= numInstancesInPool,
-              "Not enough qualified instances from pool: %s, cannot select %s 
replicas from %s instances", pool,
-              numReplicasInPool, numInstancesInPool);
-          numInstancesPerReplica = Math.min(numInstancesPerReplica, 
numInstancesInPool / numReplicasInPool);
+          Preconditions.checkState(numReplicaGroupsInPool <= 
numInstancesInPool,
+              "Not enough qualified instances from pool: %s, cannot select %s 
replica-groups from %s instances", pool,
+              numReplicaGroupsInPool, numInstancesInPool);
+          numInstancesPerReplicaGroup =
+              Math.min(numInstancesPerReplicaGroup, numInstancesInPool / 
numReplicaGroupsInPool);
         }
       }
-      LOGGER.info("Selecting {} instances per replica for table: {}", 
numInstancesPerReplica, _tableNameWithType);
+      LOGGER.info("Selecting {} instances per replica-group for table: {}", 
numInstancesPerReplicaGroup,
+          _tableNameWithType);
 
-      String[][] replicaIdToInstancesMap = new 
String[numReplicas][numInstancesPerReplica];
-      for (Map.Entry<Integer, List<Integer>> entry : 
poolToReplicaIdsMap.entrySet()) {
+      String[][] replicaGroupIdToInstancesMap = new 
String[numReplicaGroups][numInstancesPerReplicaGroup];
+      for (Map.Entry<Integer, List<Integer>> entry : 
poolToReplicaGroupIdsMap.entrySet()) {
         List<InstanceConfig> instanceConfigsInPool = 
poolToInstanceConfigsMap.get(entry.getKey());
-        List<Integer> replicaIdsInPool = entry.getValue();
+        List<Integer> replicaGroupIdsInPool = entry.getValue();
 
-        // Use round-robin to assign instances to each replica so that they 
get instances with similar picking priority
+        // Use round-robin to assign instances to each replica-group so that 
they get instances with similar picking
+        // priority
+        // E.g. (within a pool, 10 instances, 2 replica-groups, 3 instances 
per replica-group)
+        // [i0, i1, i2, i3, i4, i5, i6, i7, i8, i9]
+        //  r0  r1  r0  r1  r0  r1
         int instanceIdInPool = 0;
-        for (int instanceIdInReplica = 0; instanceIdInReplica < 
numInstancesPerReplica; instanceIdInReplica++) {
-          for (int replicaId : replicaIdsInPool) {
-            replicaIdToInstancesMap[replicaId][instanceIdInReplica] =
+        for (int instanceIdInReplicaGroup = 0; instanceIdInReplicaGroup < 
numInstancesPerReplicaGroup;
+            instanceIdInReplicaGroup++) {
+          for (int replicaGroupId : replicaGroupIdsInPool) {
+            
replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup] =
                 
instanceConfigsInPool.get(instanceIdInPool++).getInstanceName();
           }
         }
       }
 
-      // Assign instances within a replica to one partition if not configured
-      int numPartitions = _replicaPartitionConfig.getNumPartitions();
+      // Assign instances within a replica-group to one partition if not 
configured
+      int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
       if (numPartitions <= 0) {
         numPartitions = 1;
       }
-      // Assign all instances within a replica to each partition if not 
configured
-      int numInstancesPerPartition = 
_replicaPartitionConfig.getNumInstancesPerPartition();
+      // Assign all instances within a replica-group to each partition if not 
configured
+      int numInstancesPerPartition = 
_replicaGroupPartitionConfig.getNumInstancesPerPartition();
       if (numInstancesPerPartition > 0) {
-        Preconditions.checkState(numInstancesPerPartition <= 
numInstancesPerReplica,
-            "Number of instances per partition: %s must be smaller or equal to 
number of instances per replica: %s",
-            numInstancesPerPartition, numInstancesPerReplica);
+        Preconditions.checkState(numInstancesPerPartition <= 
numInstancesPerReplicaGroup,
+            "Number of instances per partition: %s must be smaller or equal to 
number of instances per replica-group: %s",
+            numInstancesPerPartition, numInstancesPerReplicaGroup);
       } else {
-        numInstancesPerPartition = numInstancesPerReplica;
+        numInstancesPerPartition = numInstancesPerReplicaGroup;
       }
-      LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica for table: {}", numPartitions,
-          numInstancesPerPartition, _tableNameWithType);
+      LOGGER.info("Selecting {} partitions, {} instances per partition within 
a replica-group for table: {}",
+          numPartitions, numInstancesPerPartition, _tableNameWithType);
 
-      // Assign consecutive instances within a replica to each partition.
-      // E.g. (within a replica, 5 instances, 3 partitions, 3 instances per 
partition)
+      // Assign consecutive instances within a replica-group to each partition.
+      // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances 
per partition)
       // [i0, i1, i2, i3, i4]
       //  p0  p0  p0  p1  p1
       //  p1  p2  p2  p2
-      for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
-        int instanceIdInReplica = 0;
+      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
+        int instanceIdInReplicaGroup = 0;
         for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
           List<String> instancesInPartition = new 
ArrayList<>(numInstancesPerPartition);
           for (int instanceIdInPartition = 0; instanceIdInPartition < 
numInstancesPerPartition;
               instanceIdInPartition++) {
-            
instancesInPartition.add(replicaIdToInstancesMap[replicaId][instanceIdInReplica]);
-            instanceIdInReplica = (instanceIdInReplica + 1) % 
numInstancesPerReplica;
+            
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
+            instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) % 
numInstancesPerReplicaGroup;
           }
           instancesInPartition.sort(null);
-          LOGGER.info("Selecting instances: {} for replica: {}, partition: {} 
for table: {}", instancesInPartition,
-              replicaId, partitionId, _tableNameWithType);
-          instancePartitions.setInstances(partitionId, replicaId, 
instancesInPartition);
+          LOGGER
+              .info("Selecting instances: {} for replica-group: {}, partition: 
{} for table: {}", instancesInPartition,
+                  replicaGroupId, partitionId, _tableNameWithType);
+          instancePartitions.setInstances(partitionId, replicaGroupId, 
instancesInPartition);
         }
       }
     } else {
@@ -164,7 +171,7 @@ public class InstanceReplicaPartitionSelector {
       int numInstanceConfigs = instanceConfigs.size();
 
       // Assign all instances if not configured
-      int numInstancesToSelect = _replicaPartitionConfig.getNumInstances();
+      int numInstancesToSelect = 
_replicaGroupPartitionConfig.getNumInstances();
       if (numInstancesToSelect > 0) {
         Preconditions.checkState(numInstancesToSelect <= numInstanceConfigs,
             "Not enough qualified instances from pool: %s (%s in the pool, 
asked for %s)", pool, numInstanceConfigs,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
index 5dfb034..f8c7fee 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineSegmentAssignment.java
@@ -45,21 +45,22 @@ import org.slf4j.LoggerFactory;
  * Segment assignment for offline table.
  * <ul>
  *   <li>
- *     Non-replica-group based assignment (only 1 replica in instance 
partitions):
+ *     Non-replica-group based assignment (only 1 replica-group in instance 
partitions):
  *     <p>Assign the segment to the instance with the least number of 
segments. In case of a tie, assign the segment to
  *     the instance with the smallest index in the list. Use Helix 
AutoRebalanceStrategy to rebalance the table.
  *   </li>
  *   <li>
- *     Replica-group based assignment (more than 1 replicas in instance 
partitions):
- *     <p>Among replicas, always mirror the assignment (pick the same index of 
the instance).
- *     <p>Within each partition, assign the segment to the servers with the 
least segments already assigned. In case of
- *     a tie, assign to the server with the smallest index in the list. Do 
this for one replica and mirror the
- *     assignment to other replicas.
- *     <p>To rebalance a table, within each partition, first calculate the 
number of segments on each server, loop over
- *     all the segments and keep the assignment if number of segments for the 
server has not been reached and track the
- *     not assigned segments, then assign the left-over segments to the 
servers with the least segments, or the smallest
- *     index if there is a tie. Repeat the process for all the partitions in 
one replica, and mirror the assignment to
- *     other replicas. With this greedy algorithm, the result is deterministic 
and with minimum segment moves.
+ *     Replica-group based assignment (more than 1 replica-groups in instance 
partitions):
+ *     <p>Among replica-groups, always mirror the assignment (pick the same 
index of the instance).
+ *     <p>Within each partition, assign the segment to the instances with the 
least segments already assigned. In case
+ *     of a tie, assign to the instance with the smallest index in the list. 
Do this for one replica-group and mirror
+ *     the assignment to other replica-groups.
+ *     <p>To rebalance a table, within each partition, first calculate the 
number of segments on each instance, loop
+ *     over all the segments and keep the assignment if number of segments for 
the instance has not been reached and
+ *     track the not assigned segments, then assign the left-over segments to 
the instances with the least segments, or
+ *     the smallest index if there is a tie. Repeat the process for all the 
partitions in one replica-group, and mirror
+ *     the assignment to other replica-groups. With this greedy algorithm, the 
result is deterministic and with minimum
+ *     segment moves.
  *   </li>
  * </ul>
  */
@@ -76,8 +77,9 @@ public class OfflineSegmentAssignment implements 
SegmentAssignment {
     _helixManager = helixManager;
     _offlineTableName = tableConfig.getTableName();
     _replication = tableConfig.getValidationConfig().getReplicationNumber();
-    ReplicaGroupStrategyConfig strategyConfig = 
tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
-    _partitionColumn = strategyConfig != null ? 
strategyConfig.getPartitionColumn() : null;
+    ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
+        tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
+    _partitionColumn = replicaGroupStrategyConfig != null ? 
replicaGroupStrategyConfig.getPartitionColumn() : null;
 
     if (_partitionColumn == null) {
       LOGGER.info("Initialized OfflineSegmentAssignment with replication: {} 
without partition column for table: {} ",
@@ -98,7 +100,7 @@ public class OfflineSegmentAssignment implements 
SegmentAssignment {
         _offlineTableName);
 
     List<String> instancesAssigned;
-    if (instancePartitions.getNumReplicas() == 1) {
+    if (instancePartitions.getNumReplicaGroups() == 1) {
       // Non-replica-group based assignment
 
       // Assign the segment to the instance with the least segments, or the 
smallest id if there is a tie
@@ -118,11 +120,11 @@ public class OfflineSegmentAssignment implements 
SegmentAssignment {
     } else {
       // Replica-group based assignment
 
-      int numReplicas = instancePartitions.getNumReplicas();
-      if (numReplicas != _replication) {
+      int numReplicaGroups = instancePartitions.getNumReplicaGroups();
+      if (numReplicaGroups != _replication) {
         LOGGER.warn(
-            "Number of replicas in instance partitions {}: {} does not match 
replication in table config: {} for table: {}, use: {}",
-            instancePartitions.getName(), numReplicas, _replication, 
_offlineTableName, numReplicas);
+            "Number of replica-groups in instance partitions {}: {} does not 
match replication in table config: {} for table: {}, use: {}",
+            instancePartitions.getName(), numReplicaGroups, _replication, 
_offlineTableName, numReplicaGroups);
       }
 
       // Fetch partition id from segment ZK metadata if partition column is 
configured
@@ -152,7 +154,7 @@ public class OfflineSegmentAssignment implements 
SegmentAssignment {
             segmentPartitionId, partitionId, numPartitions, _offlineTableName);
       }
 
-      // First assign the segment to replica 0
+      // First assign the segment to replica-group 0
       List<String> instances = instancePartitions.getInstances(partitionId, 0);
       int[] numSegmentsAssignedPerInstance =
           
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, 
instances);
@@ -166,11 +168,11 @@ public class OfflineSegmentAssignment implements 
SegmentAssignment {
         }
       }
 
-      // Mirror the assignment to all replicas
-      instancesAssigned = new ArrayList<>(numReplicas);
-      for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
+      // Mirror the assignment to all replica-groups
+      instancesAssigned = new ArrayList<>(numReplicaGroups);
+      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
         instancesAssigned
-            .add(instancePartitions.getInstances(partitionId, 
replicaId).get(instanceIdWithLeastSegmentsAssigned));
+            .add(instancePartitions.getInstances(partitionId, 
replicaGroupId).get(instanceIdWithLeastSegmentsAssigned));
       }
     }
 
@@ -188,7 +190,7 @@ public class OfflineSegmentAssignment implements 
SegmentAssignment {
     LOGGER.info("Rebalancing table: {} with instance partitions: {}", 
_offlineTableName, instancePartitions);
 
     Map<String, Map<String, String>> newAssignment;
-    if (instancePartitions.getNumReplicas() == 1) {
+    if (instancePartitions.getNumReplicaGroups() == 1) {
       // Non-replica-group based assignment
 
       List<String> instances =
@@ -198,11 +200,11 @@ public class OfflineSegmentAssignment implements 
SegmentAssignment {
     } else {
       // Replica-group based assignment
 
-      int numReplicas = instancePartitions.getNumReplicas();
-      if (numReplicas != _replication) {
+      int numReplicaGroups = instancePartitions.getNumReplicaGroups();
+      if (numReplicaGroups != _replication) {
         LOGGER.warn(
-            "Number of replicas in instance partitions {}: {} does not match 
replication in table config: {} for table: {}, use: {}",
-            instancePartitions.getName(), numReplicas, _replication, 
_offlineTableName, numReplicas);
+            "Number of replica-groups in instance partitions {}: {} does not 
match replication in table config: {} for table: {}, use: {}",
+            instancePartitions.getName(), numReplicaGroups, _replication, 
_offlineTableName, numReplicaGroups);
       }
 
       if (_partitionColumn == null) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index ab279fb..35aa136 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -45,9 +45,9 @@ import org.slf4j.LoggerFactory;
  *     differences:
  *     <ul>
  *       <li>
- *         1. Within a replica, all segments of the same partition (steam 
partition) are always assigned to exactly one
- *         server, and because of that we can directly assign or rebalance the 
CONSUMING segments to the servers based
- *         on the partition id
+ *         1. Within a replica-group, all segments of the same partition 
(steam partition) are always assigned to the
+ *         same exactly one instance, and because of that we can directly 
assign or rebalance the CONSUMING segments to
+ *         the instances based on the partition id
  *       </li>
  *       <li>
  *         2. Partition id for an instance is derived from the index of the 
instance (within the replica-group for
@@ -99,10 +99,10 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
   private List<String> assignSegment(String segmentName, InstancePartitions 
instancePartitions) {
     int partitionId = new LLCSegmentName(segmentName).getPartitionId();
 
-    if (instancePartitions.getNumReplicas() == 1) {
+    if (instancePartitions.getNumReplicaGroups() == 1) {
       // Non-replica-group based assignment:
       // Uniformly spray the partitions and replicas across the instances.
-      // E.g. (6 servers, 3 partitions, 4 replicas)
+      // E.g. (6 instances, 3 partitions, 4 replicas)
       // "0_0": [i0,  i1,  i2,  i3,  i4,  i5  ]
       //         p0r0 p0r1 p0r2 p1r3 p1r0 p1r1
       //         p1r2 p1r3 p2r0 p2r1 p2r2 p2r3
@@ -117,22 +117,22 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
       return instancesAssigned;
     } else {
       // Replica-group based assignment:
-      // Within a replica, uniformly spray the partitions across the instances.
-      // E.g. (within a replica, 3 servers, 6 partitions)
+      // Within a replica-group, uniformly spray the partitions across the 
instances.
+      // E.g. (within a replica-group, 3 instances, 6 partitions)
       // "0_0": [i0, i1, i2]
       //         p0  p1  p2
       //         p3  p4  p5
 
-      int numReplicas = instancePartitions.getNumReplicas();
-      if (numReplicas != _replication) {
+      int numReplicaGroups = instancePartitions.getNumReplicaGroups();
+      if (numReplicaGroups != _replication) {
         LOGGER.warn(
-            "Number of replicas in instance partitions {}: {} does not match 
replication in table config: {} for table: {}, use: {}",
-            instancePartitions.getName(), numReplicas, _replication, 
_realtimeTableName, numReplicas);
+            "Number of replica-groups in instance partitions {}: {} does not 
match replication in table config: {} for table: {}, use: {}",
+            instancePartitions.getName(), numReplicaGroups, _replication, 
_realtimeTableName, numReplicaGroups);
       }
 
-      List<String> instancesAssigned = new ArrayList<>(numReplicas);
-      for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
-        List<String> instances = instancePartitions.getInstances(0, replicaId);
+      List<String> instancesAssigned = new ArrayList<>(numReplicaGroups);
+      for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
+        List<String> instances = instancePartitions.getInstances(0, 
replicaGroupId);
         instancesAssigned.add(instances.get(partitionId % instances.size()));
       }
       return instancesAssigned;
@@ -156,7 +156,7 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
         completedInstancePartitions);
 
     Map<String, Map<String, String>> newAssignment;
-    if (completedInstancePartitions.getNumReplicas() == 1) {
+    if (completedInstancePartitions.getNumReplicaGroups() == 1) {
       // Non-replica-group based assignment
 
       List<String> instances = SegmentAssignmentUtils
@@ -166,11 +166,12 @@ public class RealtimeSegmentAssignment implements 
SegmentAssignment {
     } else {
       // Replica-group based assignment
 
-      int numReplicas = completedInstancePartitions.getNumReplicas();
-      if (numReplicas != _replication) {
+      int numReplicaGroups = completedInstancePartitions.getNumReplicaGroups();
+      if (numReplicaGroups != _replication) {
         LOGGER.warn(
-            "Number of replicas in instance partitions {}: {} does not match 
replication in table config: {} for table: {}, use: {}",
-            completedInstancePartitions.getName(), numReplicas, _replication, 
_realtimeTableName, numReplicas);
+            "Number of replica-groups in instance partitions {}: {} does not 
match replication in table config: {} for table: {}, use: {}",
+            completedInstancePartitions.getName(), numReplicaGroups, 
_replication, _realtimeTableName,
+            numReplicaGroups);
       }
 
       Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 38d9681..878f913 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -73,9 +73,10 @@ class SegmentAssignmentUtils {
    */
   static List<String> 
getInstancesForNonReplicaGroupBasedAssignment(InstancePartitions 
instancePartitions,
       int replication) {
-    Preconditions.checkState(instancePartitions.getNumReplicas() == 1 && 
instancePartitions.getNumPartitions() == 1,
-        "Instance partitions: %s should contain 1 replica and 1 partition for 
non-replica-group based assignment",
-        instancePartitions.getName());
+    Preconditions
+        .checkState(instancePartitions.getNumReplicaGroups() == 1 && 
instancePartitions.getNumPartitions() == 1,
+            "Instance partitions: %s should contain 1 replica and 1 partition 
for non-replica-group based assignment",
+            instancePartitions.getName());
     List<String> instances = instancePartitions.getInstances(0, 0);
     int numInstances = instances.size();
     Preconditions.checkState(numInstances >= replication,
@@ -129,25 +130,25 @@ class SegmentAssignmentUtils {
    * Rebalances one partition of the table for the replica-group based segment 
assignment strategy.
    * <ul>
    *   <li>
-   *     1. Calculate the target number of segments on each server
+   *     1. Calculate the target number of segments on each instance
    *   </li>
    *   <li>
-   *     2. Loop over all the segments and keep the assignment if target 
number of segments for the server has not been
-   *     reached and track the not assigned segments
+   *     2. Loop over all the segments and keep the assignment if target 
number of segments for the instance has not
+   *     been reached and track the not assigned segments
    *   </li>
    *   <li>
-   *     3. Assign the left-over segments to the servers with the least 
segments, or the smallest index if there is a
+   *     3. Assign the left-over segments to the instances with the least 
segments, or the smallest index if there is a
    *     tie
    *   </li>
    *   <li>
-   *     4. Mirror the assignment to other replicas
+   *     4. Mirror the assignment to other replica-groups
    *   </li>
    * </ul>
    */
   static void rebalanceReplicaGroupBasedPartition(Map<String, Map<String, 
String>> currentAssignment,
       InstancePartitions instancePartitions, int partitionId, Set<String> 
segments,
       Map<String, Map<String, String>> newAssignment) {
-    // Fetch instances in replica 0
+    // Fetch instances in replica-group 0
     List<String> instances = instancePartitions.getInstances(partitionId, 0);
     Map<String, Integer> instanceNameToIdMap = 
SegmentAssignmentUtils.getInstanceNameToIdMap(instances);
 
@@ -204,9 +205,9 @@ class SegmentAssignmentUtils {
   private static Map<String, String> 
getReplicaGroupBasedInstanceStateMap(InstancePartitions instancePartitions,
       int partitionId, int instanceId) {
     Map<String, String> instanceStateMap = new TreeMap<>();
-    int numReplicas = instancePartitions.getNumReplicas();
-    for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
-      instanceStateMap.put(instancePartitions.getInstances(partitionId, 
replicaId).get(instanceId),
+    int numReplicaGroups = instancePartitions.getNumReplicaGroups();
+    for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; 
replicaGroupId++) {
+      instanceStateMap.put(instancePartitions.getInstances(partitionId, 
replicaGroupId).get(instanceId),
           SegmentOnlineOfflineStateModel.ONLINE);
     }
     return instanceStateMap;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
index 004ff1f..8daecde 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
@@ -29,7 +29,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.config.Tenant;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfig;
-import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig;
+import 
org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.common.data.Schema;
@@ -109,12 +109,12 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
       // Expected
     }
 
-    // Add OFFLINE instance assignment config to the OFFLINE table config
+    // Add OFFLINE instance assignment config to the offline table config
     InstanceAssignmentConfig offlineInstanceAssignmentConfig = new 
InstanceAssignmentConfig();
     InstanceTagPoolConfig offlineInstanceTagPoolConfig = new 
InstanceTagPoolConfig();
     
offlineInstanceTagPoolConfig.setTag(TagNameUtils.getOfflineTagForTenant(TENANT_NAME));
     
offlineInstanceAssignmentConfig.setTagPoolConfig(offlineInstanceTagPoolConfig);
-    offlineInstanceAssignmentConfig.setReplicaPartitionConfig(new 
InstanceReplicaPartitionConfig());
+    offlineInstanceAssignmentConfig.setReplicaGroupPartitionConfig(new 
InstanceReplicaGroupPartitionConfig());
     offlineTableConfig.setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.OFFLINE, 
offlineInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(offlineTableConfig);
@@ -124,17 +124,17 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
     assertEquals(instancePartitionsMap.size(), 1);
     InstancePartitions offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
     assertNotNull(offlineInstancePartitions);
-    assertEquals(offlineInstancePartitions.getNumReplicas(), 1);
+    assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0).size(), 1);
     String offlineInstanceId = offlineInstancePartitions.getInstances(0, 
0).get(0);
 
-    // Add CONSUMING instance assignment config to the REALTIME table config
+    // Add CONSUMING instance assignment config to the real-time table config
     InstanceAssignmentConfig consumingInstanceAssignmentConfig = new 
InstanceAssignmentConfig();
     InstanceTagPoolConfig realtimeInstanceTagPoolConfig = new 
InstanceTagPoolConfig();
     
realtimeInstanceTagPoolConfig.setTag(TagNameUtils.getRealtimeTagForTenant(TENANT_NAME));
     
consumingInstanceAssignmentConfig.setTagPoolConfig(realtimeInstanceTagPoolConfig);
-    consumingInstanceAssignmentConfig.setReplicaPartitionConfig(new 
InstanceReplicaPartitionConfig());
+    consumingInstanceAssignmentConfig.setReplicaGroupPartitionConfig(new 
InstanceReplicaGroupPartitionConfig());
     realtimeTableConfig.setInstanceAssignmentConfigMap(
         Collections.singletonMap(InstancePartitionsType.CONSUMING, 
consumingInstanceAssignmentConfig));
     _helixResourceManager.setExistingTableConfig(realtimeTableConfig);
@@ -144,15 +144,15 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
     assertEquals(instancePartitionsMap.size(), 2);
     offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
     assertNotNull(offlineInstancePartitions);
-    assertEquals(offlineInstancePartitions.getNumReplicas(), 1);
+    assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
     InstancePartitions consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
     assertNotNull(consumingInstancePartitions);
-    assertEquals(consumingInstancePartitions.getNumReplicas(), 1);
+    assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
     assertEquals(consumingInstancePartitions.getInstances(0, 0).size(), 1);
-    String realtimeInstanceId = consumingInstancePartitions.getInstances(0, 
0).get(0);
+    String consumingInstanceId = consumingInstancePartitions.getInstances(0, 
0).get(0);
 
     // Use OFFLINE instance assignment config as the COMPLETED instance 
assignment config
     realtimeTableConfig
@@ -167,16 +167,16 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
     assertEquals(instancePartitionsMap.size(), 3);
     offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
     assertNotNull(offlineInstancePartitions);
-    assertEquals(offlineInstancePartitions.getNumReplicas(), 1);
+    assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
     consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
     assertNotNull(consumingInstancePartitions);
-    assertEquals(consumingInstancePartitions.getNumReplicas(), 1);
+    assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
-    assertEquals(consumingInstancePartitions.getInstances(0, 0), 
Collections.singletonList(realtimeInstanceId));
+    assertEquals(consumingInstancePartitions.getInstances(0, 0), 
Collections.singletonList(consumingInstanceId));
     InstancePartitions completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
-    assertEquals(completedInstancePartitions.getNumReplicas(), 1);
+    assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(completedInstancePartitions.getNumPartitions(), 1);
     assertEquals(completedInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
 
@@ -200,7 +200,7 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
           instancePartitionsType.getInstancePartitionsName(RAW_TABLE_NAME));
     }
 
-    // Remove the instance partitions for both OFFLINE and REALTIME table
+    // Remove the instance partitions for both offline and real-time table
     
sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
 null));
     try {
       getInstancePartitionsMap();
@@ -215,16 +215,16 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
     assertEquals(instancePartitionsMap.size(), 3);
     offlineInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
     assertNotNull(offlineInstancePartitions);
-    assertEquals(offlineInstancePartitions.getNumReplicas(), 1);
+    assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
     assertEquals(offlineInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
     consumingInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
     assertNotNull(consumingInstancePartitions);
-    assertEquals(consumingInstancePartitions.getNumReplicas(), 1);
+    assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
-    assertEquals(consumingInstancePartitions.getInstances(0, 0), 
Collections.singletonList(realtimeInstanceId));
+    assertEquals(consumingInstancePartitions.getInstances(0, 0), 
Collections.singletonList(consumingInstanceId));
     completedInstancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
-    assertEquals(completedInstancePartitions.getNumReplicas(), 1);
+    assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
     assertEquals(completedInstancePartitions.getNumPartitions(), 1);
     assertEquals(completedInstancePartitions.getInstances(0, 0), 
Collections.singletonList(offlineInstanceId));
 
@@ -236,14 +236,14 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
       // Expected
     }
 
-    // Assign instances for both OFFLINE and REALTIME table
+    // Assign instances for both offline and real-time table
     
sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME, 
null, false), null);
 
     // Instance partitions should be persisted
     instancePartitionsMap = getInstancePartitionsMap();
     assertEquals(instancePartitionsMap.size(), 3);
 
-    // Remove the instance partitions for REALTIME table
+    // Remove the instance partitions for real-time table
     sendDeleteRequest(_controllerRequestURLBuilder
         
.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
 null));
     instancePartitionsMap = getInstancePartitionsMap();
@@ -262,19 +262,19 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
     
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
     
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
 
-    // Replace OFFLINE instance with REALTIME instance for COMPLETED instance 
partitions
+    // Replace OFFLINE instance with CONSUMING instance for COMPLETED instance 
partitions
     instancePartitionsMap = 
deserializeInstancePartitionsMap(sendPostRequest(_controllerRequestURLBuilder
-            .forInstanceReplace(RAW_TABLE_NAME, 
InstancePartitionsType.COMPLETED, offlineInstanceId, realtimeInstanceId),
+            .forInstanceReplace(RAW_TABLE_NAME, 
InstancePartitionsType.COMPLETED, offlineInstanceId, consumingInstanceId),
         null));
     assertEquals(instancePartitionsMap.size(), 1);
     
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0,
 0),
-        Collections.singletonList(realtimeInstanceId));
+        Collections.singletonList(consumingInstanceId));
 
-    // Replace the instance again using REALTIME table name (old instance does 
not exist)
+    // Replace the instance again using real-time table name (old instance 
does not exist)
     try {
       sendPostRequest(_controllerRequestURLBuilder
           
.forInstanceReplace(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
 null, offlineInstanceId,
-              realtimeInstanceId), null);
+              consumingInstanceId), null);
       fail();
     } catch (FileNotFoundException e) {
       // Expected
@@ -286,27 +286,27 @@ public class PinotInstanceAssignmentRestletResourceTest 
extends ControllerTest {
             consumingInstancePartitions.toJsonString()));
     assertEquals(instancePartitionsMap.size(), 1);
     
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
 0),
-        Collections.singletonList(realtimeInstanceId));
+        Collections.singletonList(consumingInstanceId));
 
     // OFFLINE instance partitions should have OFFLINE instance, CONSUMING and 
COMPLETED instance partitions should have
-    // REALTIME instance
+    // CONSUMING instance
     instancePartitionsMap = getInstancePartitionsMap();
     assertEquals(instancePartitionsMap.size(), 3);
     
assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE).getInstances(0,
 0),
         Collections.singletonList(offlineInstanceId));
     
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
 0),
-        Collections.singletonList(realtimeInstanceId));
+        Collections.singletonList(consumingInstanceId));
     
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0,
 0),
-        Collections.singletonList(realtimeInstanceId));
+        Collections.singletonList(consumingInstanceId));
 
-    // Delete the OFFLINE table
+    // Delete the offline table
     _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
     instancePartitionsMap = getInstancePartitionsMap();
     assertEquals(instancePartitionsMap.size(), 2);
     
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
     
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
 
-    // Delete the REALTIME table
+    // Delete the real-time table
     _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
     try {
       getInstancePartitionsMap();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index fec7a73..9a7e73e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -31,7 +31,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfig;
 import org.apache.pinot.common.config.instance.InstanceAssignmentConfigUtils;
-import org.apache.pinot.common.config.instance.InstanceReplicaPartitionConfig;
+import 
org.apache.pinot.common.config.instance.InstanceReplicaGroupPartitionConfig;
 import org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import 
org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
@@ -57,9 +57,9 @@ public class InstanceAssignmentTest {
         new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setServerTenant(TENANT_NAME)
             .setNumReplicas(numReplicas)
             
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
-    int numInstancesPerReplica = 2;
+    int numInstancesPerPartition = 2;
     ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new 
ReplicaGroupStrategyConfig();
-    
replicaGroupStrategyConfig.setNumInstancesPerPartition(numInstancesPerReplica);
+    
replicaGroupStrategyConfig.setNumInstancesPerPartition(numInstancesPerPartition);
     
tableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
     InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
     int numInstances = 10;
@@ -70,9 +70,9 @@ public class InstanceAssignmentTest {
       instanceConfigs.add(instanceConfig);
     }
 
-    // Instances should be assigned to 3 replicas with a round-robin fashion, 
each with 2 instances
+    // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 2 instances
     InstancePartitions instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), numReplicas);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     // Instances of index 4 to 7 are not assigned because of the hash-based 
rotation
     // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
@@ -92,10 +92,10 @@ public class InstanceAssignmentTest {
         Collections.singletonMap(partitionColumnName, new 
ColumnPartitionConfig("Modulo", numPartitions)));
     
tableConfig.getIndexingConfig().setSegmentPartitionConfig(segmentPartitionConfig);
 
-    // Instances should be assigned to 3 replicas with a round-robin fashion, 
each with 3 instances, then these 3
+    // Instances should be assigned to 3 replica-groups with a round-robin 
fashion, each with 3 instances, then these 3
     // instances should be assigned to 2 partitions, each with 2 instances
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), numReplicas);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
     assertEquals(instancePartitions.getNumPartitions(), numPartitions);
     // Instance of index 7 is not assigned because of the hash-based rotation
     // Math.abs("myTable_OFFLINE".hashCode()) % 10 = 8
@@ -147,12 +147,12 @@ public class InstanceAssignmentTest {
     tagPoolConfig.setPoolBased(true);
     tagPoolConfig.setNumPools(numPools);
     assignmentConfig.setTagPoolConfig(tagPoolConfig);
-    // Assign to 2 replicas so that each replica is assigned to one pool
-    int numReplicas = numPools;
-    InstanceReplicaPartitionConfig replicaPartitionConfig = new 
InstanceReplicaPartitionConfig();
+    // Assign to 2 replica-groups so that each replica-group is assigned to 
one pool
+    int numReplicaGroups = numPools;
+    InstanceReplicaGroupPartitionConfig replicaPartitionConfig = new 
InstanceReplicaGroupPartitionConfig();
     replicaPartitionConfig.setReplicaGroupBased(true);
-    replicaPartitionConfig.setNumReplicas(numReplicas);
-    assignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig);
+    replicaPartitionConfig.setNumReplicaGroups(numReplicaGroups);
+    assignmentConfig.setReplicaGroupPartitionConfig(replicaPartitionConfig);
 
     TableConfig tableConfig = new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
         
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
 assignmentConfig))
@@ -160,10 +160,10 @@ public class InstanceAssignmentTest {
     InstanceAssignmentDriver driver = new 
InstanceAssignmentDriver(tableConfig);
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
-    // All instances in pool 0 should be assigned to replica 0, and all 
instances in pool 1 should be assigned to
-    // replica 1
+    // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 1 should be assigned to
+    // replica-group 1
     InstancePartitions instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), numReplicas);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
         .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, 
SERVER_INSTANCE_ID_PREFIX + 2,
@@ -187,10 +187,10 @@ public class InstanceAssignmentTest {
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // Pool 0 and 2 will be selected in the pool selection
-    // All instances in pool 0 should be assigned to replica 0, and all 
instances in pool 2 should be assigned to
-    // replica 1
+    // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 2 should be assigned to
+    // replica-group 1
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), numReplicas);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
         .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, 
SERVER_INSTANCE_ID_PREFIX + 2,
@@ -203,10 +203,10 @@ public class InstanceAssignmentTest {
     tagPoolConfig.setNumPools(numPools);
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
-    // All instances in pool 2 should be assigned to replica 0, and all 
instances in pool 0 should be assigned to
-    // replica 1
+    // All instances in pool 2 should be assigned to replica-group 0, and all 
instances in pool 0 should be assigned to
+    // replica-group 1
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), numReplicas);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
         .asList(SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 
11, SERVER_INSTANCE_ID_PREFIX + 12,
@@ -219,10 +219,10 @@ public class InstanceAssignmentTest {
     tagPoolConfig.setPools(Arrays.asList(0, 1));
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
-    // All instances in pool 0 should be assigned to replica 0, and all 
instances in pool 1 should be assigned to
-    // replica 1
+    // All instances in pool 0 should be assigned to replica-group 0, and all 
instances in pool 1 should be assigned to
+    // replica-group 1
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), numReplicas);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0), Arrays
         .asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 1, 
SERVER_INSTANCE_ID_PREFIX + 2,
@@ -231,22 +231,22 @@ public class InstanceAssignmentTest {
         .asList(SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 6, 
SERVER_INSTANCE_ID_PREFIX + 7,
             SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 9));
 
-    // Assign instances from 2 pools to 3 replicas
-    numReplicas = numPools;
-    replicaPartitionConfig.setNumReplicas(numReplicas);
+    // Assign instances from 2 pools to 3 replica-groups
+    numReplicaGroups = numPools;
+    replicaPartitionConfig.setNumReplicaGroups(numReplicaGroups);
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
     // [pool0, pool1]
     //  r0     r1
-    //  r0
-    // Each replica should have 2 instances assigned
+    //  r2
+    // Each replica-group should have 2 instances assigned
     // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
     // pool 0: [i3, i4, i0, i1, i2]
     //          r0  r2  r0  r2
     // pool 1: [i8, i9, i5, i6, i7]
     //          r1  r1
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), numReplicas);
+    assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3));
@@ -314,20 +314,20 @@ public class InstanceAssignmentTest {
       instanceConfig.addTag(OFFLINE_TAG);
     }
 
-    // No instance replica/partition config
+    // No instance replica-group/partition config
     try {
       driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
       fail();
     } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Instance replica/partition config is 
missing");
+      assertEquals(e.getMessage(), "Instance replica-group/partition config is 
missing");
     }
 
-    InstanceReplicaPartitionConfig replicaPartitionConfig = new 
InstanceReplicaPartitionConfig();
-    assignmentConfig.setReplicaPartitionConfig(replicaPartitionConfig);
+    InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig = new 
InstanceReplicaGroupPartitionConfig();
+    
assignmentConfig.setReplicaGroupPartitionConfig(replicaGroupPartitionConfig);
 
-    // All instances should be assigned as replica 0 partition 0
+    // All instances should be assigned as replica-group 0 partition 0
     InstancePartitions instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), 1);
+    assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     List<String> expectedInstances = new ArrayList<>(numInstances);
     for (int i = 0; i < numInstances; i++) {
@@ -356,9 +356,9 @@ public class InstanceAssignmentTest {
     }
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
-    // All instances in pool 0 should be assigned as replica 0 partition 0
+    // All instances in pool 0 should be assigned as replica-group 0 partition 0
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), 1);
+    assertEquals(instancePartitions.getNumReplicaGroups(), 1);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     expectedInstances.clear();
     for (int i = 0; i < numInstances / 2; i++) {
@@ -388,7 +388,7 @@ public class InstanceAssignmentTest {
     }
 
     tagPoolConfig.setPools(null);
-    replicaPartitionConfig.setNumInstances(6);
+    replicaGroupPartitionConfig.setNumInstances(6);
 
     // Ask for too many instances
     try {
@@ -398,32 +398,32 @@ public class InstanceAssignmentTest {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 
0 (5 in the pool, asked for 6)");
     }
 
-    replicaPartitionConfig.setNumInstances(0);
+    replicaGroupPartitionConfig.setNumInstances(0);
 
     // Enable replica-group
-    replicaPartitionConfig.setReplicaGroupBased(true);
+    replicaGroupPartitionConfig.setReplicaGroupBased(true);
 
-    // Number of replicas must be positive
+    // Number of replica-groups must be positive
     try {
       driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
       fail();
     } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Number of replicas must be positive");
+      assertEquals(e.getMessage(), "Number of replica-groups must be 
positive");
     }
 
-    replicaPartitionConfig.setNumReplicas(11);
+    replicaGroupPartitionConfig.setNumReplicaGroups(11);
 
-    // Ask for too many replicas
+    // Ask for too many replica-groups
     try {
       driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
-          "Not enough qualified instances from pool: 0, cannot select 6 
replicas from 5 instances");
+          "Not enough qualified instances from pool: 0, cannot select 6 
replica-groups from 5 instances");
     }
 
-    replicaPartitionConfig.setNumReplicas(3);
-    replicaPartitionConfig.setNumInstancesPerReplica(3);
+    replicaGroupPartitionConfig.setNumReplicaGroups(3);
+    replicaGroupPartitionConfig.setNumInstancesPerReplicaGroup(3);
 
     // Ask for too many instances
     try {
@@ -433,8 +433,8 @@ public class InstanceAssignmentTest {
       assertEquals(e.getMessage(), "Not enough qualified instances from pool: 
0 (5 in the pool, asked for 6)");
     }
 
-    replicaPartitionConfig.setNumInstancesPerReplica(2);
-    replicaPartitionConfig.setNumInstancesPerPartition(3);
+    replicaGroupPartitionConfig.setNumInstancesPerReplicaGroup(2);
+    replicaGroupPartitionConfig.setNumInstancesPerPartition(3);
 
     // Ask for too many instances per partition
     try {
@@ -442,10 +442,10 @@ public class InstanceAssignmentTest {
       fail();
     } catch (IllegalStateException e) {
       assertEquals(e.getMessage(),
-          "Number of instances per partition: 3 must be smaller or equal to 
number of instances per replica: 2");
+          "Number of instances per partition: 3 must be smaller or equal to 
number of instances per replica-group: 2");
     }
 
-    replicaPartitionConfig.setNumInstancesPerPartition(0);
+    replicaGroupPartitionConfig.setNumInstancesPerPartition(0);
 
     // Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
     // pool0: [i3, i4, i0, i1, i2]
@@ -453,7 +453,7 @@ public class InstanceAssignmentTest {
     // pool1: [i8, i9, i5, i6, i7]
     //         r1  r1
     instancePartitions = 
driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs);
-    assertEquals(instancePartitions.getNumReplicas(), 3);
+    assertEquals(instancePartitions.getNumReplicaGroups(), 3);
     assertEquals(instancePartitions.getNumPartitions(), 1);
     assertEquals(instancePartitions.getInstances(0, 0),
         Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX 
+ 3));
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
index 089c016..6b26f8e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentTest.java
@@ -93,14 +93,14 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
     // }
     InstancePartitions instancePartitionsWithoutPartition =
         new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION);
-    int numInstancesPerReplica = NUM_INSTANCES / NUM_REPLICAS;
+    int numInstancesPerReplicaGroup = NUM_INSTANCES / NUM_REPLICAS;
     int instanceIdToAdd = 0;
-    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-      List<String> instancesForReplica = new 
ArrayList<>(numInstancesPerReplica);
-      for (int i = 0; i < numInstancesPerReplica; i++) {
-        instancesForReplica.add(INSTANCES.get(instanceIdToAdd++));
+    for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+      List<String> instancesForReplicaGroup = new 
ArrayList<>(numInstancesPerReplicaGroup);
+      for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
+        instancesForReplicaGroup.add(INSTANCES.get(instanceIdToAdd++));
       }
-      instancePartitionsWithoutPartition.setInstances(0, replicaId, 
instancesForReplica);
+      instancePartitionsWithoutPartition.setInstances(0, replicaGroupId, 
instancesForReplicaGroup);
     }
     _instancePartitionsMapWithoutPartition =
         Collections.singletonMap(InstancePartitionsType.OFFLINE, 
instancePartitionsWithoutPartition);
@@ -128,13 +128,13 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
     HelixManager helixManagerWithPartitions = mock(HelixManager.class);
     
when(helixManagerWithPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithPartitions);
 
-    ReplicaGroupStrategyConfig strategyConfig = new 
ReplicaGroupStrategyConfig();
-    strategyConfig.setPartitionColumn(PARTITION_COLUMN);
+    ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new 
ReplicaGroupStrategyConfig();
+    replicaGroupStrategyConfig.setPartitionColumn(PARTITION_COLUMN);
     TableConfig tableConfigWithPartitions =
         new 
TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITH_PARTITION)
             .setNumReplicas(NUM_REPLICAS)
             
.setSegmentAssignmentStrategy(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY).build();
-    
tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(strategyConfig);
+    
tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
     _segmentAssignmentWithPartition =
         
SegmentAssignmentFactory.getSegmentAssignment(helixManagerWithPartitions, 
tableConfigWithPartitions);
 
@@ -151,15 +151,15 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
     // }
     InstancePartitions instancePartitionsWithPartition =
         new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION);
-    int numInstancesPerPartition = numInstancesPerReplica / NUM_REPLICAS;
+    int numInstancesPerPartition = numInstancesPerReplicaGroup / NUM_REPLICAS;
     instanceIdToAdd = 0;
-    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
+    for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
       for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) {
         List<String> instancesForPartition = new 
ArrayList<>(numInstancesPerPartition);
         for (int i = 0; i < numInstancesPerPartition; i++) {
           instancesForPartition.add(INSTANCES.get(instanceIdToAdd++));
         }
-        instancePartitionsWithPartition.setInstances(partitionId, replicaId, 
instancesForPartition);
+        instancePartitionsWithPartition.setInstances(partitionId, 
replicaGroupId, instancesForPartition);
       }
     }
     _instancePartitionsMapWithPartition =
@@ -174,27 +174,29 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
 
   @Test
   public void testAssignSegmentWithoutPartition() {
-    int numInstancesPerReplica = NUM_INSTANCES / NUM_REPLICAS;
+    int numInstancesPerReplicaGroup = NUM_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = SEGMENTS.get(segmentId);
       List<String> instancesAssigned = _segmentAssignmentWithoutPartition
           .assignSegment(segmentName, currentAssignment, 
_instancePartitionsMapWithoutPartition);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
-      for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
 
-        // Segment 0 should be assigned to instance 0, 6, 12
-        // Segment 1 should be assigned to instance 1, 7, 13
-        // Segment 2 should be assigned to instance 2, 8, 14
-        // Segment 3 should be assigned to instance 3, 9, 15
-        // Segment 4 should be assigned to instance 4, 10, 16
-        // Segment 5 should be assigned to instance 5, 11, 17
-        // Segment 6 should be assigned to instance 0, 6, 12
-        // Segment 7 should be assigned to instance 1, 7, 13
-        // ...
-        int expectedAssignedInstanceId = segmentId % numInstancesPerReplica + 
replicaId * numInstancesPerReplica;
-        assertEquals(instancesAssigned.get(replicaId), 
INSTANCES.get(expectedAssignedInstanceId));
+      // Segment 0 should be assigned to instance 0, 6, 12
+      // Segment 1 should be assigned to instance 1, 7, 13
+      // Segment 2 should be assigned to instance 2, 8, 14
+      // Segment 3 should be assigned to instance 3, 9, 15
+      // Segment 4 should be assigned to instance 4, 10, 16
+      // Segment 5 should be assigned to instance 5, 11, 17
+      // Segment 6 should be assigned to instance 0, 6, 12
+      // Segment 7 should be assigned to instance 1, 7, 13
+      // ...
+      for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+        int expectedAssignedInstanceId =
+            segmentId % numInstancesPerReplicaGroup + replicaGroupId * 
numInstancesPerReplicaGroup;
+        assertEquals(instancesAssigned.get(replicaGroupId), 
INSTANCES.get(expectedAssignedInstanceId));
       }
+
       currentAssignment.put(segmentName,
           SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentOnlineOfflineStateModel.ONLINE));
     }
@@ -202,31 +204,32 @@ public class OfflineReplicaGroupSegmentAssignmentTest {
 
   @Test
   public void testAssignSegmentWithPartition() {
-    int numInstancesPerReplica = NUM_INSTANCES / NUM_REPLICAS;
+    int numInstancesPerReplicaGroup = NUM_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
-    int numInstancesPerPartition = numInstancesPerReplica / NUM_PARTITIONS;
+    int numInstancesPerPartition = numInstancesPerReplicaGroup / 
NUM_PARTITIONS;
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = SEGMENTS.get(segmentId);
       List<String> instancesAssigned = _segmentAssignmentWithPartition
           .assignSegment(segmentName, currentAssignment, 
_instancePartitionsMapWithPartition);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
-      int partitionId = segmentId % NUM_PARTITIONS;
-      for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
 
-        // Segment 0 (partition 0) should be assigned to instance 0, 6, 12
-        // Segment 1 (partition 1) should be assigned to instance 2, 8, 14
-        // Segment 2 (partition 2) should be assigned to instance 4, 10, 16
-        // Segment 3 (partition 0) should be assigned to instance 1, 7, 13
-        // Segment 4 (partition 1) should be assigned to instance 3, 9, 15
-        // Segment 5 (partition 2) should be assigned to instance 5, 11, 17
-        // Segment 6 (partition 0) should be assigned to instance 0, 6, 12
-        // Segment 7 (partition 1) should be assigned to instance 2, 8, 14
-        // ...
+      // Segment 0 (partition 0) should be assigned to instance 0, 6, 12
+      // Segment 1 (partition 1) should be assigned to instance 2, 8, 14
+      // Segment 2 (partition 2) should be assigned to instance 4, 10, 16
+      // Segment 3 (partition 0) should be assigned to instance 1, 7, 13
+      // Segment 4 (partition 1) should be assigned to instance 3, 9, 15
+      // Segment 5 (partition 2) should be assigned to instance 5, 11, 17
+      // Segment 6 (partition 0) should be assigned to instance 0, 6, 12
+      // Segment 7 (partition 1) should be assigned to instance 2, 8, 14
+      // ...
+      int partitionId = segmentId % NUM_PARTITIONS;
+      for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
         int expectedAssignedInstanceId =
-            (segmentId % numInstancesPerReplica) / NUM_PARTITIONS + 
partitionId * numInstancesPerPartition
-                + replicaId * numInstancesPerReplica;
-        assertEquals(instancesAssigned.get(replicaId), 
INSTANCES.get(expectedAssignedInstanceId));
+            (segmentId % numInstancesPerReplicaGroup) / NUM_PARTITIONS + 
partitionId * numInstancesPerPartition
+                + replicaGroupId * numInstancesPerReplicaGroup;
+        assertEquals(instancesAssigned.get(replicaGroupId), 
INSTANCES.get(expectedAssignedInstanceId));
       }
+
       currentAssignment.put(segmentName,
           SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentOnlineOfflineStateModel.ONLINE));
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
index 5fee01a..86bd4ca 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java
@@ -105,19 +105,20 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest 
{
       List<String> instancesAssigned =
           _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
-      for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
 
-        // Segment 0 (partition 0) should be assigned to instance 0, 1, 2
-        // Segment 1 (partition 1) should be assigned to instance 3, 4, 5
-        // Segment 2 (partition 2) should be assigned to instance 6, 7, 8
-        // Segment 3 (partition 3) should be assigned to instance 0, 1, 2
-        // Segment 4 (partition 0) should be assigned to instance 0, 1, 2
-        // Segment 5 (partition 1) should be assigned to instance 3, 4, 5
-        // ...
-        int partitionId = segmentId % NUM_PARTITIONS;
+      // Segment 0 (partition 0) should be assigned to instance 0, 1, 2
+      // Segment 1 (partition 1) should be assigned to instance 3, 4, 5
+      // Segment 2 (partition 2) should be assigned to instance 6, 7, 8
+      // Segment 3 (partition 3) should be assigned to instance 0, 1, 2
+      // Segment 4 (partition 0) should be assigned to instance 0, 1, 2
+      // Segment 5 (partition 1) should be assigned to instance 3, 4, 5
+      // ...
+      int partitionId = segmentId % NUM_PARTITIONS;
+      for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
         int expectedAssignedInstanceId = (partitionId * NUM_REPLICAS + 
replicaId) % NUM_CONSUMING_INSTANCES;
         assertEquals(instancesAssigned.get(replicaId), 
CONSUMING_INSTANCES.get(expectedAssignedInstanceId));
       }
+
       addToAssignment(currentAssignment, segmentId, instancesAssigned);
     }
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
index 04fc344..0fc60df 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java
@@ -85,14 +85,14 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
     //        p0          p1          p2
     //        p3
     InstancePartitions consumingInstancePartitions = new 
InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME);
-    int numConsumingInstancesPerReplica = NUM_CONSUMING_INSTANCES / 
NUM_REPLICAS;
+    int numConsumingInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / 
NUM_REPLICAS;
     int consumingInstanceIdToAdd = 0;
-    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-      List<String> consumingInstancesForReplica = new 
ArrayList<>(numConsumingInstancesPerReplica);
-      for (int i = 0; i < numConsumingInstancesPerReplica; i++) {
-        
consumingInstancesForReplica.add(CONSUMING_INSTANCES.get(consumingInstanceIdToAdd++));
+    for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+      List<String> consumingInstancesForReplicaGroup = new 
ArrayList<>(numConsumingInstancesPerReplicaGroup);
+      for (int i = 0; i < numConsumingInstancesPerReplicaGroup; i++) {
+        
consumingInstancesForReplicaGroup.add(CONSUMING_INSTANCES.get(consumingInstanceIdToAdd++));
       }
-      consumingInstancePartitions.setInstances(0, replicaId, 
consumingInstancesForReplica);
+      consumingInstancePartitions.setInstances(0, replicaGroupId, 
consumingInstancesForReplicaGroup);
     }
     _instancePartitionsMap.put(InstancePartitionsType.CONSUMING, 
consumingInstancePartitions);
 
@@ -103,14 +103,14 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
     //   0_2=[instance_8, instance_9, instance_10, instance_11]
     // }
     InstancePartitions completedInstancePartitions = new 
InstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME);
-    int numCompletedInstancesPerReplica = NUM_COMPLETED_INSTANCES / 
NUM_REPLICAS;
+    int numCompletedInstancesPerReplicaGroup = NUM_COMPLETED_INSTANCES / 
NUM_REPLICAS;
     int completedInstanceIdToAdd = 0;
-    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-      List<String> completedInstancesForReplica = new 
ArrayList<>(numCompletedInstancesPerReplica);
-      for (int i = 0; i < numCompletedInstancesPerReplica; i++) {
-        
completedInstancesForReplica.add(COMPLETED_INSTANCES.get(completedInstanceIdToAdd++));
+    for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+      List<String> completedInstancesForReplicaGroup = new 
ArrayList<>(numCompletedInstancesPerReplicaGroup);
+      for (int i = 0; i < numCompletedInstancesPerReplicaGroup; i++) {
+        
completedInstancesForReplicaGroup.add(COMPLETED_INSTANCES.get(completedInstanceIdToAdd++));
       }
-      completedInstancePartitions.setInstances(0, replicaId, 
completedInstancesForReplica);
+      completedInstancePartitions.setInstances(0, replicaGroupId, 
completedInstancesForReplicaGroup);
     }
     _instancePartitionsMap.put(InstancePartitionsType.COMPLETED, 
completedInstancePartitions);
   }
@@ -122,26 +122,28 @@ public class RealtimeReplicaGroupSegmentAssignmentTest {
 
   @Test
   public void testAssignSegment() {
-    int numInstancesPerReplica = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
+    int numInstancesPerReplicaGroup = NUM_CONSUMING_INSTANCES / NUM_REPLICAS;
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
       String segmentName = _segments.get(segmentId);
       List<String> instancesAssigned =
           _segmentAssignment.assignSegment(segmentName, currentAssignment, 
_instancePartitionsMap);
       assertEquals(instancesAssigned.size(), NUM_REPLICAS);
-      for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-
-        // Segment 0 (partition 0) should be assigned to instance 0, 3, 6
-        // Segment 1 (partition 1) should be assigned to instance 1, 4, 7
-        // Segment 2 (partition 2) should be assigned to instance 2, 5, 8
-        // Segment 3 (partition 3) should be assigned to instance 0, 3, 6
-        // Segment 4 (partition 0) should be assigned to instance 0, 3, 6
-        // Segment 5 (partition 1) should be assigned to instance 1, 4, 7
-        // ...
+
+      // Segment 0 (partition 0) should be assigned to instance 0, 3, 6
+      // Segment 1 (partition 1) should be assigned to instance 1, 4, 7
+      // Segment 2 (partition 2) should be assigned to instance 2, 5, 8
+      // Segment 3 (partition 3) should be assigned to instance 0, 3, 6
+      // Segment 4 (partition 0) should be assigned to instance 0, 3, 6
+      // Segment 5 (partition 1) should be assigned to instance 1, 4, 7
+      // ...
+      for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
         int partitionId = segmentId % NUM_PARTITIONS;
-        int expectedAssignedInstanceId = partitionId % numInstancesPerReplica 
+ replicaId * numInstancesPerReplica;
-        assertEquals(instancesAssigned.get(replicaId), 
CONSUMING_INSTANCES.get(expectedAssignedInstanceId));
+        int expectedAssignedInstanceId =
+            partitionId % numInstancesPerReplicaGroup + replicaGroupId * 
numInstancesPerReplicaGroup;
+        assertEquals(instancesAssigned.get(replicaGroupId), 
CONSUMING_INSTANCES.get(expectedAssignedInstanceId));
       }
+
       addToAssignment(currentAssignment, segmentId, instancesAssigned);
     }
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
index 86cfac6..ef746e8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
@@ -217,28 +217,28 @@ public class SegmentAssignmentUtilsTest {
     //   0_2=[instance_6, instance_7, instance_8]
     // }
     InstancePartitions instancePartitions = new InstancePartitions(null);
-    int numInstancesPerReplica = numInstances / NUM_REPLICAS;
+    int numInstancesPerReplicaGroup = numInstances / NUM_REPLICAS;
     int instanceIdToAdd = 0;
-    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-      List<String> instancesForReplica = new 
ArrayList<>(numInstancesPerReplica);
-      for (int i = 0; i < numInstancesPerReplica; i++) {
-        instancesForReplica.add(instances.get(instanceIdToAdd++));
+    for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+      List<String> instancesForReplicaGroup = new 
ArrayList<>(numInstancesPerReplicaGroup);
+      for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
+        instancesForReplicaGroup.add(instances.get(instanceIdToAdd++));
       }
-      instancePartitions.setInstances(0, replicaId, instancesForReplica);
+      instancePartitions.setInstances(0, replicaGroupId, 
instancesForReplicaGroup);
     }
 
     // Uniformly spray segments to the instances:
-    // Replica group 0: [instance_0, instance_1, instance_2],
-    // Replica group 1: [instance_3, instance_4, instance_5],
-    // Replica group 2: [instance_6, instance_7, instance_8]
+    // Replica-group 0: [instance_0, instance_1, instance_2],
+    // Replica-group 1: [instance_3, instance_4, instance_5],
+    // Replica-group 2: [instance_6, instance_7, instance_8]
     //                   segment_0   segment_1   segment_2
     //                   segment_3   segment_4   segment_5
     //                   ...
     Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
     for (int segmentId = 0; segmentId < numSegments; segmentId++) {
       List<String> instancesAssigned = new ArrayList<>(NUM_REPLICAS);
-      for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-        int assignedInstanceId = segmentId % numInstancesPerReplica + 
replicaId * numInstancesPerReplica;
+      for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+        int assignedInstanceId = segmentId % numInstancesPerReplicaGroup + 
replicaGroupId * numInstancesPerReplicaGroup;
         instancesAssigned.add(instances.get(assignedInstanceId));
       }
       currentAssignment.put(segments.get(segmentId),
@@ -270,20 +270,20 @@ public class SegmentAssignmentUtilsTest {
     //   0_2=[instance_6, instance_7, instance_8]
     // }
     List<String> newInstances = new ArrayList<>(numInstances);
-    List<String> newReplica0Instances = new 
ArrayList<>(instancePartitions.getInstances(0, 0));
-    String newReplica0Instance = INSTANCE_NAME_PREFIX + 9;
-    newReplica0Instances.set(0, newReplica0Instance);
-    newInstances.addAll(newReplica0Instances);
-    List<String> newReplica1Instances = new 
ArrayList<>(instancePartitions.getInstances(0, 1));
-    String newReplica1Instance = INSTANCE_NAME_PREFIX + 10;
-    newReplica1Instances.set(1, newReplica1Instance);
-    newInstances.addAll(newReplica1Instances);
-    List<String> newReplica2Instances = instancePartitions.getInstances(0, 2);
-    newInstances.addAll(newReplica2Instances);
+    List<String> newReplicaGroup0Instances = new 
ArrayList<>(instancePartitions.getInstances(0, 0));
+    String newReplicaGroup0Instance = INSTANCE_NAME_PREFIX + 9;
+    newReplicaGroup0Instances.set(0, newReplicaGroup0Instance);
+    newInstances.addAll(newReplicaGroup0Instances);
+    List<String> newReplicaGroup1Instances = new 
ArrayList<>(instancePartitions.getInstances(0, 1));
+    String newReplicaGroup1Instance = INSTANCE_NAME_PREFIX + 10;
+    newReplicaGroup1Instances.set(1, newReplicaGroup1Instance);
+    newInstances.addAll(newReplicaGroup1Instances);
+    List<String> newReplicaGroup2Instances = 
instancePartitions.getInstances(0, 2);
+    newInstances.addAll(newReplicaGroup2Instances);
     InstancePartitions newInstancePartitions = new InstancePartitions(null);
-    newInstancePartitions.setInstances(0, 0, newReplica0Instances);
-    newInstancePartitions.setInstances(0, 1, newReplica1Instances);
-    newInstancePartitions.setInstances(0, 2, newReplica2Instances);
+    newInstancePartitions.setInstances(0, 0, newReplicaGroup0Instances);
+    newInstancePartitions.setInstances(0, 1, newReplicaGroup1Instances);
+    newInstancePartitions.setInstances(0, 2, newReplicaGroup2Instances);
     Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
         .rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions, partitionIdToSegmentsMap);
     // There should be 90 segments assigned
@@ -301,34 +301,34 @@ public class SegmentAssignmentUtilsTest {
     Map<String, Integer> numSegmentsToBeMovedPerInstance =
         
SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, 
newAssignment);
     assertEquals(numSegmentsToBeMovedPerInstance.size(), 2);
-    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newReplica0Instance), 
numSegmentsPerInstance);
-    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newReplica1Instance), 
numSegmentsPerInstance);
-    String replica0OldInstanceName = INSTANCE_NAME_PREFIX + 0;
-    String replica1OldInstanceName = INSTANCE_NAME_PREFIX + 4;
+    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newReplicaGroup0Instance), 
numSegmentsPerInstance);
+    assertEquals((int) 
numSegmentsToBeMovedPerInstance.get(newReplicaGroup1Instance), 
numSegmentsPerInstance);
+    String oldReplicaGroup0Instance = INSTANCE_NAME_PREFIX + 0;
+    String oldReplicaGroup1Instance = INSTANCE_NAME_PREFIX + 4;
     for (String segmentName : segments) {
       Map<String, String> oldInstanceStateMap = 
currentAssignment.get(segmentName);
-      if (oldInstanceStateMap.containsKey(replica0OldInstanceName)) {
-        
assertTrue(newAssignment.get(segmentName).containsKey(newReplica0Instance));
+      if (oldInstanceStateMap.containsKey(oldReplicaGroup0Instance)) {
+        
assertTrue(newAssignment.get(segmentName).containsKey(newReplicaGroup0Instance));
       }
-      if (oldInstanceStateMap.containsKey(replica1OldInstanceName)) {
-        
assertTrue(newAssignment.get(segmentName).containsKey(newReplica1Instance));
+      if (oldInstanceStateMap.containsKey(oldReplicaGroup1Instance)) {
+        
assertTrue(newAssignment.get(segmentName).containsKey(newReplicaGroup1Instance));
       }
     }
 
-    // Remove 3 instances (1 from each replica)
+    // Remove 3 instances (1 from each replica-group)
     // {
     //   0_0=[instance_0, instance_1],
     //   0_1=[instance_3, instance_4],
     //   0_2=[instance_6, instance_7]
     // }
     int newNumInstances = numInstances - 3;
-    int newNumInstancesPerReplica = newNumInstances / NUM_REPLICAS;
+    int newNumInstancesPerReplicaGroup = newNumInstances / NUM_REPLICAS;
     newInstances = new ArrayList<>(newNumInstances);
-    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-      List<String> newInstancesForReplica =
-          instancePartitions.getInstances(0, replicaId).subList(0, 
newNumInstancesPerReplica);
-      newInstancePartitions.setInstances(0, replicaId, newInstancesForReplica);
-      newInstances.addAll(newInstancesForReplica);
+    for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+      List<String> newInstancesForReplicaGroup =
+          instancePartitions.getInstances(0, replicaGroupId).subList(0, 
newNumInstancesPerReplicaGroup);
+      newInstancePartitions.setInstances(0, replicaGroupId, 
newInstancesForReplicaGroup);
+      newInstances.addAll(newInstancesForReplicaGroup);
     }
     newAssignment = SegmentAssignmentUtils
         .rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions, partitionIdToSegmentsMap);
@@ -354,22 +354,22 @@ public class SegmentAssignmentUtilsTest {
           newNumSegmentsPerInstance - numSegmentsPerInstance);
     }
 
-    // Add 6 instances (2 to each replica)
+    // Add 6 instances (2 to each replica-group)
     // {
     //   0_0=[instance_0, instance_1, instance_2, instance_9, instance_10],
     //   0_1=[instance_3, instance_4, instance_5, instance_11, instance_12],
     //   0_2=[instance_6, instance_7, instance_8, instance_13, instance_14]
     // }
     newNumInstances = numInstances + 6;
-    newNumInstancesPerReplica = newNumInstances / NUM_REPLICAS;
+    newNumInstancesPerReplicaGroup = newNumInstances / NUM_REPLICAS;
     newInstances = 
SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances);
     instanceIdToAdd = numInstances;
-    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-      List<String> newInstancesForReplica = new 
ArrayList<>(instancePartitions.getInstances(0, replicaId));
-      for (int i = 0; i < newNumInstancesPerReplica - numInstancesPerReplica; 
i++) {
-        newInstancesForReplica.add(newInstances.get(instanceIdToAdd++));
+    for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+      List<String> newInstancesForReplicaGroup = new 
ArrayList<>(instancePartitions.getInstances(0, replicaGroupId));
+      for (int i = 0; i < newNumInstancesPerReplicaGroup - 
numInstancesPerReplicaGroup; i++) {
+        newInstancesForReplicaGroup.add(newInstances.get(instanceIdToAdd++));
       }
-      newInstancePartitions.setInstances(0, replicaId, newInstancesForReplica);
+      newInstancePartitions.setInstances(0, replicaGroupId, 
newInstancesForReplicaGroup);
     }
     newAssignment = SegmentAssignmentUtils
         .rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions, partitionIdToSegmentsMap);
@@ -402,12 +402,12 @@ public class SegmentAssignmentUtilsTest {
     // }
     newInstances = SegmentAssignmentTestUtils.getNameList("i_", numInstances);
     instanceIdToAdd = 0;
-    for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) {
-      List<String> instancesForReplica = new 
ArrayList<>(numInstancesPerReplica);
-      for (int i = 0; i < numInstancesPerReplica; i++) {
-        instancesForReplica.add(newInstances.get(instanceIdToAdd++));
+    for (int replicaGroupId = 0; replicaGroupId < NUM_REPLICAS; 
replicaGroupId++) {
+      List<String> instancesForReplicaGroup = new 
ArrayList<>(numInstancesPerReplicaGroup);
+      for (int i = 0; i < numInstancesPerReplicaGroup; i++) {
+        instancesForReplicaGroup.add(newInstances.get(instanceIdToAdd++));
       }
-      newInstancePartitions.setInstances(0, replicaId, instancesForReplica);
+      newInstancePartitions.setInstances(0, replicaGroupId, 
instancesForReplicaGroup);
     }
     newAssignment = SegmentAssignmentUtils
         .rebalanceReplicaGroupBasedTable(currentAssignment, 
newInstancePartitions, partitionIdToSegmentsMap);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to