adixitconfluent commented on code in PR #17322:
URL: https://github.com/apache/kafka/pull/17322#discussion_r1803537916


##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -229,12 +233,12 @@ public void 
testReplicaManagerFetchShouldHappenOnComplete() {
         ShareFetchData shareFetchData = new ShareFetchData(
                 new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
                         1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
-                new CompletableFuture<>(), partitionMaxBytes);
+                new CompletableFuture<>(), partitionMaxBytes, 100);

Review Comment:
   Can we use this 100 as a constant, I see its usage at many places in this 
file and other test files.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1209,34 +1352,42 @@ public void testAcquireReleasedRecordMultipleBatches() {
         // Fourth fetch request with 5 records starting from offset 28.
         MemoryRecords records4 = memoryRecords(5, 28);
 
-        List<AcquiredRecords> acquiredRecordsList = sharePartition.acquire(
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
             MEMBER_ID,
+            Integer.MAX_VALUE,
             new FetchPartitionData(Errors.NONE, 40, 3, records1,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
 
         assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(15, sharePartition.nextFetchOffset());
 
-        acquiredRecordsList = sharePartition.acquire(
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
             MEMBER_ID,
+            Integer.MAX_VALUE,

Review Comment:
   Same comment as the 100 one, can we put Integer.MAX_VALUE as a variable for 
DEFAULT_MAX_FETCH_RECORDS



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -433,6 +433,7 @@ class BrokerServer(
         config.shareGroupConfig.shareGroupDeliveryCountLimit,
         config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
         config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests,
+        config.shareGroupConfig.shareFetchMaxFetchRecords(),

Review Comment:
   nit: Just being consistent with the above values, we can get rid of the 
parenthesis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to