apoorvmittal10 commented on code in PR #19148: URL: https://github.com/apache/kafka/pull/19148#discussion_r1989856233
########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -365,14 +345,14 @@ public void testNewContext() { new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true)); // Test generating a throttled response for a subsequent share session - ShareFetchContext context7 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, + ShareFetchContext context7 = sharePartitionManager.newContext(groupId, Collections.emptyList(), EMPTY_PART_LIST, Review Comment: ```suggestion ShareFetchContext context7 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, ``` ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -458,7 +434,7 @@ public void testShareSessionExpiration() { time.sleep(500); // Create a subsequent share fetch context for session 1 - ShareFetchContext session1context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, + ShareFetchContext session1context2 = sharePartitionManager.newContext(groupId, Collections.emptyList(), EMPTY_PART_LIST, Review Comment: ```suggestion ShareFetchContext session1context2 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, ``` ########## server/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java: ########## @@ -20,47 +20,48 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** * Helper class to return the erroneous partitions and valid partition data */ public class ErroneousAndValidPartitionData { private final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous; - private final Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions; + private final List<TopicIdPartition> validTopicIdPartitions; public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous, - Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions) { + List<TopicIdPartition> validTopicIdPartitions) { this.erroneous = erroneous; this.validTopicIdPartitions = validTopicIdPartitions; } - public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData) { + public ErroneousAndValidPartitionData(List<TopicIdPartition> shareFetchData) { erroneous = new HashMap<>(); - validTopicIdPartitions = new HashMap<>(); - shareFetchData.forEach((topicIdPartition, sharePartitionData) -> { + validTopicIdPartitions = new ArrayList<>(); + shareFetchData.forEach(topicIdPartition -> { if (topicIdPartition.topic() == null) { erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)); } else { - validTopicIdPartitions.put(topicIdPartition, sharePartitionData); + validTopicIdPartitions.add(topicIdPartition); } }); } public ErroneousAndValidPartitionData() { this.erroneous = new HashMap<>(); - this.validTopicIdPartitions = new HashMap<>(); + this.validTopicIdPartitions = new ArrayList<>(); Review Comment: Do we modify `erroneous` or `validTopicIdPartitions` after initializing using default constructor. I am thinking is it correct to have mutable variables? ########## server/src/main/java/org/apache/kafka/server/share/ErroneousAndValidPartitionData.java: ########## @@ -20,47 +20,48 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** * Helper class to return the erroneous partitions and valid partition data */ public class ErroneousAndValidPartitionData { private final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous; - private final Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions; + private final List<TopicIdPartition> validTopicIdPartitions; public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchResponseData.PartitionData> erroneous, - Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> validTopicIdPartitions) { + List<TopicIdPartition> validTopicIdPartitions) { this.erroneous = erroneous; this.validTopicIdPartitions = validTopicIdPartitions; } - public ErroneousAndValidPartitionData(Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> shareFetchData) { + public ErroneousAndValidPartitionData(List<TopicIdPartition> shareFetchData) { erroneous = new HashMap<>(); - validTopicIdPartitions = new HashMap<>(); - shareFetchData.forEach((topicIdPartition, sharePartitionData) -> { + validTopicIdPartitions = new ArrayList<>(); + shareFetchData.forEach(topicIdPartition -> { if (topicIdPartition.topic() == null) { erroneous.put(topicIdPartition, ShareFetchResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID)); Review Comment: Question, we are checking topic name here and filling error as `UNKNOWN_TOPIC_ID`, is it correct? ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -365,14 +345,14 @@ public void testNewContext() { new ShareRequestMetadata(shareSessionKey2.memberId(), 5), true)); // Test generating a throttled response for a subsequent share session - ShareFetchContext context7 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, + ShareFetchContext context7 = sharePartitionManager.newContext(groupId, Collections.emptyList(), EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 2), true); ShareFetchResponse resp7 = context7.throttleResponse(100); assertEquals(Errors.NONE, resp7.error()); assertEquals(100, resp7.throttleTimeMs()); // Get the final share session. - ShareFetchContext context8 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, + ShareFetchContext context8 = sharePartitionManager.newContext(groupId, Collections.emptyList(), EMPTY_PART_LIST, Review Comment: ```suggestion ShareFetchContext context8 = sharePartitionManager.newContext(groupId, List.of(), EMPTY_PART_LIST, ``` ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -201,17 +198,15 @@ public void testNewContextReturnsFinalContextWithoutRequestData() { Uuid memberId = Uuid.randomUuid(); // Create a new share session with an initial share fetch request - Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> reqData1 = new LinkedHashMap<>(); - reqData1.put(tp0, new ShareFetchRequest.SharePartitionData(tp0.topicId(), PARTITION_MAX_BYTES)); - reqData1.put(tp1, new ShareFetchRequest.SharePartitionData(tp1.topicId(), PARTITION_MAX_BYTES)); + List<TopicIdPartition> reqData1 = List.of(tp0, tp1); ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH); ShareFetchContext context1 = sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, false); assertInstanceOf(ShareSessionContext.class, context1); assertFalse(((ShareSessionContext) context1).isSubsequent()); ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, ShareRequestMetadata.FINAL_EPOCH); - ShareFetchContext context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), Collections.emptyList(), reqMetadata2, true); + ShareFetchContext context2 = sharePartitionManager.newContext(groupId, Collections.emptyList(), Collections.emptyList(), reqMetadata2, true); Review Comment: ```suggestion ShareFetchContext context2 = sharePartitionManager.newContext(groupId, List.of(), List.of(), reqMetadata2, true); ``` ########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -59,7 +59,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)) ) - val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, Map.empty) + val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) Review Comment: Remove `MAX_PARTITION_BYTES` constant in the file. ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -6083,8 +6083,8 @@ class ReplicaManagerTest { try { val groupId = "grp" val tp1 = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo1", 0)) - val partitionMaxBytes = new util.LinkedHashMap[TopicIdPartition, Integer] - partitionMaxBytes.put(tp1, 1000) + val topicPartitions = new util.ArrayList[TopicIdPartition] + topicPartitions.add(tp1) Review Comment: Do we need mutable list? Else we can use `util.List.of`. ########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -1163,7 +1144,7 @@ public void testPartitionMaxBytesFromUniformStrategyInCombineLogReadResponse() { public void testOnCompleteExecutionOnTimeout() { ShareFetch shareFetch = new ShareFetch( FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), - new CompletableFuture<>(), new LinkedHashMap<>(), BATCH_SIZE, MAX_FETCH_RECORDS, + new CompletableFuture<>(), new ArrayList<>(), BATCH_SIZE, MAX_FETCH_RECORDS, Review Comment: ```suggestion new CompletableFuture<>(), List.of(), BATCH_SIZE, MAX_FETCH_RECORDS, ``` ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4095,9 +4089,8 @@ class KafkaApisTest extends Logging { ) when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn( - new ShareSessionContext(new ShareRequestMetadata(memberId, 0), Map( - new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> - new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + new ShareSessionContext(new ShareRequestMetadata(memberId, 0), List( + new TopicIdPartition(topicId, partitionIndex, topicName) ).asJava) Review Comment: Use util.List.of ########## server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java: ########## @@ -80,20 +81,8 @@ static LinkedHashMap<TopicIdPartition, Integer> rotateRoundRobin( return topicIdPartitions; } - // TODO: Once the partition max bytes is removed then the partition will be a linked list and rotation - // will be a simple operation. Else consider using ImplicitLinkedHashCollection. - LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new LinkedHashMap<>(rotateAt); - LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new LinkedHashMap<>(topicIdPartitions.size()); - int i = 0; - for (Map.Entry<TopicIdPartition, Integer> entry : topicIdPartitions.entrySet()) { - if (i < rotateAt) { - suffixPartitions.put(entry.getKey(), entry.getValue()); - } else { - rotatedPartitions.put(entry.getKey(), entry.getValue()); - } - i++; - } - rotatedPartitions.putAll(suffixPartitions); + List<TopicIdPartition> rotatedPartitions = new ArrayList<>(topicIdPartitions); Review Comment: Also please write comment that we don't want to modify the input list hence created a copy. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4044,7 +4039,6 @@ class KafkaApisTest extends Logging { setPartitions(List( new ShareFetchRequestData.FetchPartition() .setPartitionIndex(0) - .setPartitionMaxBytes(partitionMaxBytes) ).asJava) ).asJava) Review Comment: ``` shareFetchRequestData = new ShareFetchRequestData(). setGroupId(groupId). setMemberId(memberId.toString). setShareSessionEpoch(-1). setTopics(util.List.of(new ShareFetchRequestData.FetchTopic(). setTopicId(topicId). setPartitions(util.List.of( new ShareFetchRequestData.FetchPartition() .setPartitionIndex(0) .setAcknowledgementBatches(util.List.of( new AcknowledgementBatch() .setFirstOffset(0) .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte)) )) )) )) ########## server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java: ########## @@ -80,20 +81,8 @@ static LinkedHashMap<TopicIdPartition, Integer> rotateRoundRobin( return topicIdPartitions; } - // TODO: Once the partition max bytes is removed then the partition will be a linked list and rotation - // will be a simple operation. Else consider using ImplicitLinkedHashCollection. - LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new LinkedHashMap<>(rotateAt); - LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new LinkedHashMap<>(topicIdPartitions.size()); - int i = 0; - for (Map.Entry<TopicIdPartition, Integer> entry : topicIdPartitions.entrySet()) { - if (i < rotateAt) { - suffixPartitions.put(entry.getKey(), entry.getValue()); - } else { - rotatedPartitions.put(entry.getKey(), entry.getValue()); - } - i++; - } - rotatedPartitions.putAll(suffixPartitions); + List<TopicIdPartition> rotatedPartitions = new ArrayList<>(topicIdPartitions); + Collections.rotate(rotatedPartitions, -1 * rotateAt); Review Comment: I think for elements [1,2,3] and rotate at 1 we want the output as [2,3,1] instead of [3,1,2] hence you negated. Can you please write comment regarding why `-1`? ########## server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java: ########## @@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest { @Test public void testRoundRobinStrategy() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); - LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3); + ArrayList<TopicIdPartition> partitions = createPartitions(3); Review Comment: ```suggestion List<TopicIdPartition> partitions = createPartitions(3); ``` ########## server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java: ########## @@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest { @Test public void testRoundRobinStrategy() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); - LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3); + ArrayList<TopicIdPartition> partitions = createPartitions(3); - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(partitions, new PartitionRotateMetadata(1)); + List<TopicIdPartition> result = strategy.rotate(partitions, new PartitionRotateMetadata(1)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 1); + validateRotatedListEquals(partitions, result, 1); // Session epoch is greater than the number of partitions. result = strategy.rotate(partitions, new PartitionRotateMetadata(5)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 2); + validateRotatedListEquals(partitions, result, 2); // Session epoch is at Integer.MAX_VALUE. result = strategy.rotate(partitions, new PartitionRotateMetadata(Integer.MAX_VALUE)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 1); + validateRotatedListEquals(partitions, result, 1); // No rotation at same size as epoch. result = strategy.rotate(partitions, new PartitionRotateMetadata(3)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); } @Test public void testRoundRobinStrategyWithSpecialSessionEpochs() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); - LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3); - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate( + ArrayList<TopicIdPartition> partitions = createPartitions(3); + List<TopicIdPartition> result = strategy.rotate( partitions, new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); result = strategy.rotate( partitions, new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); } @Test public void testRoundRobinStrategyWithEmptyPartitions() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); // Empty partitions. - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new LinkedHashMap<>(), new PartitionRotateMetadata(5)); + List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new PartitionRotateMetadata(5)); // The result should be empty. assertTrue(result.isEmpty()); } /** - * Create an ordered map of TopicIdPartition to partition max bytes. + * Create an ordered set of topic partitions. * @param size The number of topic-partitions to create. - * @return The ordered map of TopicIdPartition to partition max bytes. + * @return The ordered set of topic partitions. */ - private LinkedHashMap<TopicIdPartition, Integer> createPartitions(int size) { - LinkedHashMap<TopicIdPartition, Integer> partitions = new LinkedHashMap<>(); + private ArrayList<TopicIdPartition> createPartitions(int size) { Review Comment: ```suggestion private List<TopicIdPartition> createPartitions(int size) { ``` ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4095,9 +4089,8 @@ class KafkaApisTest extends Logging { ) when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn( - new ShareSessionContext(new ShareRequestMetadata(memberId, 0), Map( - new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> - new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + new ShareSessionContext(new ShareRequestMetadata(memberId, 0), List( + new TopicIdPartition(topicId, partitionIndex, topicName) ).asJava) Review Comment: And at other places below. ########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -458,7 +434,7 @@ public void testShareSessionExpiration() { time.sleep(500); // Create a subsequent share fetch context for session 1 - ShareFetchContext session1context2 = sharePartitionManager.newContext(groupId, Collections.emptyMap(), EMPTY_PART_LIST, + ShareFetchContext session1context2 = sharePartitionManager.newContext(groupId, Collections.emptyList(), EMPTY_PART_LIST, Review Comment: And at other places changed in the PR. ########## server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java: ########## @@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest { @Test public void testRoundRobinStrategy() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); - LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3); + ArrayList<TopicIdPartition> partitions = createPartitions(3); - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(partitions, new PartitionRotateMetadata(1)); + List<TopicIdPartition> result = strategy.rotate(partitions, new PartitionRotateMetadata(1)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 1); + validateRotatedListEquals(partitions, result, 1); // Session epoch is greater than the number of partitions. result = strategy.rotate(partitions, new PartitionRotateMetadata(5)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 2); + validateRotatedListEquals(partitions, result, 2); // Session epoch is at Integer.MAX_VALUE. result = strategy.rotate(partitions, new PartitionRotateMetadata(Integer.MAX_VALUE)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 1); + validateRotatedListEquals(partitions, result, 1); // No rotation at same size as epoch. result = strategy.rotate(partitions, new PartitionRotateMetadata(3)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); } @Test public void testRoundRobinStrategyWithSpecialSessionEpochs() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); - LinkedHashMap<TopicIdPartition, Integer> partitions = createPartitions(3); - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate( + ArrayList<TopicIdPartition> partitions = createPartitions(3); + List<TopicIdPartition> result = strategy.rotate( partitions, new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); result = strategy.rotate( partitions, new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH)); assertEquals(3, result.size()); - validateRotatedMapEquals(partitions, result, 0); + validateRotatedListEquals(partitions, result, 0); } @Test public void testRoundRobinStrategyWithEmptyPartitions() { PartitionRotateStrategy strategy = PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN); // Empty partitions. - LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new LinkedHashMap<>(), new PartitionRotateMetadata(5)); + List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new PartitionRotateMetadata(5)); // The result should be empty. assertTrue(result.isEmpty()); } /** - * Create an ordered map of TopicIdPartition to partition max bytes. + * Create an ordered set of topic partitions. * @param size The number of topic-partitions to create. - * @return The ordered map of TopicIdPartition to partition max bytes. + * @return The ordered set of topic partitions. */ - private LinkedHashMap<TopicIdPartition, Integer> createPartitions(int size) { - LinkedHashMap<TopicIdPartition, Integer> partitions = new LinkedHashMap<>(); + private ArrayList<TopicIdPartition> createPartitions(int size) { + ArrayList<TopicIdPartition> partitions = new ArrayList<>(); Review Comment: ```suggestion List<TopicIdPartition> partitions = new ArrayList<>(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org