apoorvmittal10 commented on code in PR #19148:
URL: https://github.com/apache/kafka/pull/19148#discussion_r1989856233


##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -365,14 +345,14 @@ public void testNewContext() {
             new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
 
         // Test generating a throttled response for a subsequent share session
-        ShareFetchContext context7 = sharePartitionManager.newContext(groupId, 
Collections.emptyMap(), EMPTY_PART_LIST,
+        ShareFetchContext context7 = sharePartitionManager.newContext(groupId, 
Collections.emptyList(), EMPTY_PART_LIST,

Review Comment:
   ```suggestion
           ShareFetchContext context7 = 
sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
   ```



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -458,7 +434,7 @@ public void testShareSessionExpiration() {
         time.sleep(500);
 
         // Create a subsequent share fetch context for session 1
-        ShareFetchContext session1context2 = 
sharePartitionManager.newContext(groupId, Collections.emptyMap(), 
EMPTY_PART_LIST,
+        ShareFetchContext session1context2 = 
sharePartitionManager.newContext(groupId, Collections.emptyList(), 
EMPTY_PART_LIST,

Review Comment:
   ```suggestion
           ShareFetchContext session1context2 = 
sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
   ```



##########
server/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java:
##########
@@ -20,47 +20,48 @@
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ShareFetchRequest;
 import org.apache.kafka.common.requests.ShareFetchResponse;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
  * Helper class to return the erroneous partitions and valid partition data
  */
 public class ErroneousAndValidPartitionData {
     private final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
erroneous;
-    private final Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> 
validTopicIdPartitions;
+    private final List<TopicIdPartition> validTopicIdPartitions;
 
     public ErroneousAndValidPartitionData(Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData> erroneous,
-                                          Map<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> validTopicIdPartitions) {
+                                          List<TopicIdPartition> 
validTopicIdPartitions) {
         this.erroneous = erroneous;
         this.validTopicIdPartitions = validTopicIdPartitions;
     }
 
-    public ErroneousAndValidPartitionData(Map<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> shareFetchData) {
+    public ErroneousAndValidPartitionData(List<TopicIdPartition> 
shareFetchData) {
         erroneous = new HashMap<>();
-        validTopicIdPartitions = new HashMap<>();
-        shareFetchData.forEach((topicIdPartition, sharePartitionData) -> {
+        validTopicIdPartitions = new ArrayList<>();
+        shareFetchData.forEach(topicIdPartition -> {
             if (topicIdPartition.topic() == null) {
                 erroneous.put(topicIdPartition, 
ShareFetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_ID));
             } else {
-                validTopicIdPartitions.put(topicIdPartition, 
sharePartitionData);
+                validTopicIdPartitions.add(topicIdPartition);
             }
         });
     }
 
     public ErroneousAndValidPartitionData() {
         this.erroneous = new HashMap<>();
-        this.validTopicIdPartitions = new HashMap<>();
+        this.validTopicIdPartitions = new ArrayList<>();

Review Comment:
   Do we modify `erroneous` or `validTopicIdPartitions` after initializing 
using default constructor. I am thinking is it correct to have mutable 
variables?



##########
server/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java:
##########
@@ -20,47 +20,48 @@
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.ShareFetchRequest;
 import org.apache.kafka.common.requests.ShareFetchResponse;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
  * Helper class to return the erroneous partitions and valid partition data
  */
 public class ErroneousAndValidPartitionData {
     private final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
erroneous;
-    private final Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> 
validTopicIdPartitions;
+    private final List<TopicIdPartition> validTopicIdPartitions;
 
     public ErroneousAndValidPartitionData(Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData> erroneous,
-                                          Map<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> validTopicIdPartitions) {
+                                          List<TopicIdPartition> 
validTopicIdPartitions) {
         this.erroneous = erroneous;
         this.validTopicIdPartitions = validTopicIdPartitions;
     }
 
-    public ErroneousAndValidPartitionData(Map<TopicIdPartition, 
ShareFetchRequest.SharePartitionData> shareFetchData) {
+    public ErroneousAndValidPartitionData(List<TopicIdPartition> 
shareFetchData) {
         erroneous = new HashMap<>();
-        validTopicIdPartitions = new HashMap<>();
-        shareFetchData.forEach((topicIdPartition, sharePartitionData) -> {
+        validTopicIdPartitions = new ArrayList<>();
+        shareFetchData.forEach(topicIdPartition -> {
             if (topicIdPartition.topic() == null) {
                 erroneous.put(topicIdPartition, 
ShareFetchResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_ID));

Review Comment:
   Question, we are checking topic name here and filling error as 
`UNKNOWN_TOPIC_ID`, is it correct?



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -365,14 +345,14 @@ public void testNewContext() {
             new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true));
 
         // Test generating a throttled response for a subsequent share session
-        ShareFetchContext context7 = sharePartitionManager.newContext(groupId, 
Collections.emptyMap(), EMPTY_PART_LIST,
+        ShareFetchContext context7 = sharePartitionManager.newContext(groupId, 
Collections.emptyList(), EMPTY_PART_LIST,
             new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true);
         ShareFetchResponse resp7 = context7.throttleResponse(100);
         assertEquals(Errors.NONE, resp7.error());
         assertEquals(100, resp7.throttleTimeMs());
 
         // Get the final share session.
-        ShareFetchContext context8 = sharePartitionManager.newContext(groupId, 
Collections.emptyMap(), EMPTY_PART_LIST,
+        ShareFetchContext context8 = sharePartitionManager.newContext(groupId, 
Collections.emptyList(), EMPTY_PART_LIST,

Review Comment:
   ```suggestion
           ShareFetchContext context8 = 
sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST,
   ```



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -201,17 +198,15 @@ public void 
testNewContextReturnsFinalContextWithoutRequestData() {
         Uuid memberId = Uuid.randomUuid();
 
         // Create a new share session with an initial share fetch request
-        Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = 
new LinkedHashMap<>();
-        reqData1.put(tp0, new 
ShareFetchRequest.SharePartitionData(tp0.topicId(), PARTITION_MAX_BYTES));
-        reqData1.put(tp1, new 
ShareFetchRequest.SharePartitionData(tp1.topicId(), PARTITION_MAX_BYTES));
+        List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
 
         ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.INITIAL_EPOCH);
         ShareFetchContext context1 = sharePartitionManager.newContext(groupId, 
reqData1, EMPTY_PART_LIST, reqMetadata1, false);
         assertInstanceOf(ShareSessionContext.class, context1);
         assertFalse(((ShareSessionContext) context1).isSubsequent());
 
         ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, 
ShareRequestMetadata.FINAL_EPOCH);
-        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, 
Collections.emptyMap(), Collections.emptyList(), reqMetadata2, true);
+        ShareFetchContext context2 = sharePartitionManager.newContext(groupId, 
Collections.emptyList(), Collections.emptyList(), reqMetadata2, true);

Review Comment:
   ```suggestion
           ShareFetchContext context2 = 
sharePartitionManager.newContext(groupId, List.of(), List.of(), reqMetadata2, 
true);
   ```



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -59,7 +59,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1))
     )
 
-    val shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, Map.empty)
+    val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)

Review Comment:
   Remove `MAX_PARTITION_BYTES` constant in the file.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -6083,8 +6083,8 @@ class ReplicaManagerTest {
     try {
       val groupId = "grp"
       val tp1 = new TopicIdPartition(Uuid.randomUuid, new 
TopicPartition("foo1", 0))
-      val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer]
-      partitionMaxBytes.put(tp1, 1000)
+      val topicPartitions = new util.ArrayList[TopicIdPartition]
+      topicPartitions.add(tp1)

Review Comment:
   Do we need mutable list? Else we can use `util.List.of`.



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1163,7 +1144,7 @@ public void 
testPartitionMaxBytesFromUniformStrategyInCombineLogReadResponse() {
     public void testOnCompleteExecutionOnTimeout() {
         ShareFetch shareFetch = new ShareFetch(
             FETCH_PARAMS, "grp", Uuid.randomUuid().toString(),
-            new CompletableFuture<>(), new LinkedHashMap<>(), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            new CompletableFuture<>(), new ArrayList<>(), BATCH_SIZE, 
MAX_FETCH_RECORDS,

Review Comment:
   ```suggestion
               new CompletableFuture<>(), List.of(), BATCH_SIZE, 
MAX_FETCH_RECORDS,
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4095,9 +4089,8 @@ class KafkaApisTest extends Logging {
     )
 
     when(sharePartitionManager.newContext(any(), any(), any(), any(), 
any())).thenReturn(
-      new ShareSessionContext(new ShareRequestMetadata(memberId, 0), Map(
-        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
-          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      new ShareSessionContext(new ShareRequestMetadata(memberId, 0), List(
+        new TopicIdPartition(topicId, partitionIndex, topicName)
       ).asJava)

Review Comment:
   Use util.List.of



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -80,20 +81,8 @@ static LinkedHashMap<TopicIdPartition, Integer> 
rotateRoundRobin(
             return topicIdPartitions;
         }
 
-        // TODO: Once the partition max bytes is removed then the partition 
will be a linked list and rotation
-        //  will be a simple operation. Else consider using 
ImplicitLinkedHashCollection.
-        LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new 
LinkedHashMap<>(rotateAt);
-        LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new 
LinkedHashMap<>(topicIdPartitions.size());
-        int i = 0;
-        for (Map.Entry<TopicIdPartition, Integer> entry : 
topicIdPartitions.entrySet()) {
-            if (i < rotateAt) {
-                suffixPartitions.put(entry.getKey(), entry.getValue());
-            } else {
-                rotatedPartitions.put(entry.getKey(), entry.getValue());
-            }
-            i++;
-        }
-        rotatedPartitions.putAll(suffixPartitions);
+        List<TopicIdPartition> rotatedPartitions = new 
ArrayList<>(topicIdPartitions);

Review Comment:
   Also please write comment that we don't want to modify the input list hence 
created a copy.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4044,7 +4039,6 @@ class KafkaApisTest extends Logging {
         setPartitions(List(
           new ShareFetchRequestData.FetchPartition()
             .setPartitionIndex(0)
-            .setPartitionMaxBytes(partitionMaxBytes)
         ).asJava)
       ).asJava)

Review Comment:
   ```
   shareFetchRequestData = new ShareFetchRequestData().
         setGroupId(groupId).
         setMemberId(memberId.toString).
         setShareSessionEpoch(-1).
         setTopics(util.List.of(new ShareFetchRequestData.FetchTopic().
           setTopicId(topicId).
           setPartitions(util.List.of(
             new ShareFetchRequestData.FetchPartition()
               .setPartitionIndex(0)
               .setAcknowledgementBatches(util.List.of(
                 new AcknowledgementBatch()
                   .setFirstOffset(0)
                   .setLastOffset(9)
                   .setAcknowledgeTypes(Collections.singletonList(1.toByte))
               ))
           ))
         ))



##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -80,20 +81,8 @@ static LinkedHashMap<TopicIdPartition, Integer> 
rotateRoundRobin(
             return topicIdPartitions;
         }
 
-        // TODO: Once the partition max bytes is removed then the partition 
will be a linked list and rotation
-        //  will be a simple operation. Else consider using 
ImplicitLinkedHashCollection.
-        LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new 
LinkedHashMap<>(rotateAt);
-        LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new 
LinkedHashMap<>(topicIdPartitions.size());
-        int i = 0;
-        for (Map.Entry<TopicIdPartition, Integer> entry : 
topicIdPartitions.entrySet()) {
-            if (i < rotateAt) {
-                suffixPartitions.put(entry.getKey(), entry.getValue());
-            } else {
-                rotatedPartitions.put(entry.getKey(), entry.getValue());
-            }
-            i++;
-        }
-        rotatedPartitions.putAll(suffixPartitions);
+        List<TopicIdPartition> rotatedPartitions = new 
ArrayList<>(topicIdPartitions);
+        Collections.rotate(rotatedPartitions, -1 * rotateAt);

Review Comment:
   I think for elements [1,2,3] and rotate at 1 we want the output as [2,3,1] 
instead of [3,1,2] hence you negated. Can you please write comment regarding 
why `-1`?



##########
server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java:
##########
@@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest {
     @Test
     public void testRoundRobinStrategy() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
-        LinkedHashMap<TopicIdPartition, Integer> partitions = 
createPartitions(3);
+        ArrayList<TopicIdPartition> partitions = createPartitions(3);

Review Comment:
   ```suggestion
           List<TopicIdPartition> partitions = createPartitions(3);
   ```



##########
server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java:
##########
@@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest {
     @Test
     public void testRoundRobinStrategy() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
-        LinkedHashMap<TopicIdPartition, Integer> partitions = 
createPartitions(3);
+        ArrayList<TopicIdPartition> partitions = createPartitions(3);
 
-        LinkedHashMap<TopicIdPartition, Integer> result = 
strategy.rotate(partitions, new PartitionRotateMetadata(1));
+        List<TopicIdPartition> result = strategy.rotate(partitions, new 
PartitionRotateMetadata(1));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 1);
+        validateRotatedListEquals(partitions, result, 1);
 
         // Session epoch is greater than the number of partitions.
         result = strategy.rotate(partitions, new PartitionRotateMetadata(5));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 2);
+        validateRotatedListEquals(partitions, result, 2);
 
         // Session epoch is at Integer.MAX_VALUE.
         result = strategy.rotate(partitions, new 
PartitionRotateMetadata(Integer.MAX_VALUE));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 1);
+        validateRotatedListEquals(partitions, result, 1);
 
         // No rotation at same size as epoch.
         result = strategy.rotate(partitions, new PartitionRotateMetadata(3));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
     }
 
     @Test
     public void testRoundRobinStrategyWithSpecialSessionEpochs() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
 
-        LinkedHashMap<TopicIdPartition, Integer> partitions = 
createPartitions(3);
-        LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(
+        ArrayList<TopicIdPartition> partitions = createPartitions(3);
+        List<TopicIdPartition> result = strategy.rotate(
             partitions,
             new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
 
         result = strategy.rotate(
             partitions,
             new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
     }
 
     @Test
     public void testRoundRobinStrategyWithEmptyPartitions() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
         // Empty partitions.
-        LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new 
LinkedHashMap<>(), new PartitionRotateMetadata(5));
+        List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new 
PartitionRotateMetadata(5));
         // The result should be empty.
         assertTrue(result.isEmpty());
     }
 
     /**
-     * Create an ordered map of TopicIdPartition to partition max bytes.
+     * Create an ordered set of topic partitions.
      * @param size The number of topic-partitions to create.
-     * @return The ordered map of TopicIdPartition to partition max bytes.
+     * @return The ordered set of topic partitions.
      */
-    private LinkedHashMap<TopicIdPartition, Integer> createPartitions(int 
size) {
-        LinkedHashMap<TopicIdPartition, Integer> partitions = new 
LinkedHashMap<>();
+    private ArrayList<TopicIdPartition> createPartitions(int size) {

Review Comment:
   ```suggestion
       private List<TopicIdPartition> createPartitions(int size) {
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4095,9 +4089,8 @@ class KafkaApisTest extends Logging {
     )
 
     when(sharePartitionManager.newContext(any(), any(), any(), any(), 
any())).thenReturn(
-      new ShareSessionContext(new ShareRequestMetadata(memberId, 0), Map(
-        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
-          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      new ShareSessionContext(new ShareRequestMetadata(memberId, 0), List(
+        new TopicIdPartition(topicId, partitionIndex, topicName)
       ).asJava)

Review Comment:
   And at other places below.



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -458,7 +434,7 @@ public void testShareSessionExpiration() {
         time.sleep(500);
 
         // Create a subsequent share fetch context for session 1
-        ShareFetchContext session1context2 = 
sharePartitionManager.newContext(groupId, Collections.emptyMap(), 
EMPTY_PART_LIST,
+        ShareFetchContext session1context2 = 
sharePartitionManager.newContext(groupId, Collections.emptyList(), 
EMPTY_PART_LIST,

Review Comment:
   And at other places changed in the PR.



##########
server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java:
##########
@@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest {
     @Test
     public void testRoundRobinStrategy() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
-        LinkedHashMap<TopicIdPartition, Integer> partitions = 
createPartitions(3);
+        ArrayList<TopicIdPartition> partitions = createPartitions(3);
 
-        LinkedHashMap<TopicIdPartition, Integer> result = 
strategy.rotate(partitions, new PartitionRotateMetadata(1));
+        List<TopicIdPartition> result = strategy.rotate(partitions, new 
PartitionRotateMetadata(1));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 1);
+        validateRotatedListEquals(partitions, result, 1);
 
         // Session epoch is greater than the number of partitions.
         result = strategy.rotate(partitions, new PartitionRotateMetadata(5));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 2);
+        validateRotatedListEquals(partitions, result, 2);
 
         // Session epoch is at Integer.MAX_VALUE.
         result = strategy.rotate(partitions, new 
PartitionRotateMetadata(Integer.MAX_VALUE));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 1);
+        validateRotatedListEquals(partitions, result, 1);
 
         // No rotation at same size as epoch.
         result = strategy.rotate(partitions, new PartitionRotateMetadata(3));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
     }
 
     @Test
     public void testRoundRobinStrategyWithSpecialSessionEpochs() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
 
-        LinkedHashMap<TopicIdPartition, Integer> partitions = 
createPartitions(3);
-        LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(
+        ArrayList<TopicIdPartition> partitions = createPartitions(3);
+        List<TopicIdPartition> result = strategy.rotate(
             partitions,
             new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
 
         result = strategy.rotate(
             partitions,
             new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH));
         assertEquals(3, result.size());
-        validateRotatedMapEquals(partitions, result, 0);
+        validateRotatedListEquals(partitions, result, 0);
     }
 
     @Test
     public void testRoundRobinStrategyWithEmptyPartitions() {
         PartitionRotateStrategy strategy = 
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
         // Empty partitions.
-        LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new 
LinkedHashMap<>(), new PartitionRotateMetadata(5));
+        List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new 
PartitionRotateMetadata(5));
         // The result should be empty.
         assertTrue(result.isEmpty());
     }
 
     /**
-     * Create an ordered map of TopicIdPartition to partition max bytes.
+     * Create an ordered set of topic partitions.
      * @param size The number of topic-partitions to create.
-     * @return The ordered map of TopicIdPartition to partition max bytes.
+     * @return The ordered set of topic partitions.
      */
-    private LinkedHashMap<TopicIdPartition, Integer> createPartitions(int 
size) {
-        LinkedHashMap<TopicIdPartition, Integer> partitions = new 
LinkedHashMap<>();
+    private ArrayList<TopicIdPartition> createPartitions(int size) {
+        ArrayList<TopicIdPartition> partitions = new ArrayList<>();

Review Comment:
   ```suggestion
           List<TopicIdPartition> partitions = new ArrayList<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to