mingyen066 commented on code in PR #21678:
URL: https://github.com/apache/kafka/pull/21678#discussion_r2906296387


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -12427,6 +12427,183 @@ public void 
testDynamicDeliveryCountIncreaseAllowsMoreDeliveries() {
         assertFalse(sharePartition.cachedState().isEmpty());
     }
 
+    @Test
+    public void testMaxInFlightRecordsUsesGroupConfigWhenPresent() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(5000);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(2000)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // maxInFlightRecords() should return the group config value, not the 
default.
+        assertEquals(5000, sharePartition.maxInFlightRecords());
+    }
+
+    @Test
+    public void testMaxInFlightRecordsFallsBackToDefaultWhenNoGroupConfig() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(2000)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // maxInFlightRecords() should return the default value.
+        assertEquals(2000, sharePartition.maxInFlightRecords());
+    }
+
+    @Test
+    public void testDynamicPartitionMaxRecordLocksDecrease() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(100)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records = memoryRecords(0, 50);
+
+        // Acquire 50 records, which is under the default limit of 100.
+        fetchAcquiredRecords(sharePartition, records, 50);
+
+        // Dynamically decrease the limit to 30 via group config.
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(30);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        // The effective limit should now be 30.
+        assertEquals(30, sharePartition.maxInFlightRecords());
+
+        // 50 in-flight > 30 limit, but canAcquireRecords checks 
nextFetchOffset != endOffset + 1
+        // first. Since all records [0-49] are acquired, nextFetchOffset == 
endOffset + 1 == 50,
+        // so the second check (numInFlightRecords < maxInFlightRecords) 
applies: 50 < 30 is false.
+        assertFalse(sharePartition.canAcquireRecords());
+    }
+
+    @Test
+    public void testDynamicPartitionMaxRecordLocksIncrease() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(10)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records = memoryRecords(0, 10);
+
+        // Acquire 10 records, hitting the default limit.
+        fetchAcquiredRecords(sharePartition, records, 10);
+
+        // canAcquireRecords should be false with default limit of 10.
+        assertFalse(sharePartition.canAcquireRecords());
+
+        // Increase limit to 500 via group config.
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        assertEquals(500, sharePartition.maxInFlightRecords());
+
+        // Now canAcquireRecords should be true: 10 < 500.
+        assertTrue(sharePartition.canAcquireRecords());
+    }
+
+    @Test
+    public void testDynamicPartitionMaxRecordLocksExactBoundary() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(50)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records = memoryRecords(0, 50);
+
+        // Acquire exactly 50 records, hitting the default limit exactly.
+        fetchAcquiredRecords(sharePartition, records, 50);
+
+        // At exact boundary: 50 < 50 is false.
+        assertFalse(sharePartition.canAcquireRecords());
+
+        // Dynamically set limit to exactly the in-flight count via group 
config.
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(50);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        // Still at boundary: 50 < 50 is false.
+        assertFalse(sharePartition.canAcquireRecords());
+
+        // Increase by 1 to cross the boundary.
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(51);
+
+        // Now 50 < 51 is true.
+        assertTrue(sharePartition.canAcquireRecords());
+    }
+
+    @Test
+    public void testDynamicPartitionMaxRecordLocksRemoveGroupConfig() {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
+        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(100)
+            .withGroupConfigManager(groupConfigManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Group config sets limit to 500.
+        assertEquals(500, sharePartition.maxInFlightRecords());
+
+        // Remove group config — should fall back to default of 100.
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        assertEquals(100, sharePartition.maxInFlightRecords());
+    }
+
+    @Test
+    public void 
testDynamicPartitionMaxRecordLocksDecreaseBelowInFlightAffectsMaxRecordsToAcquire()
 {
+        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withMaxInflightRecords(100)

Review Comment:
   Right, removed the explicit `withMaxInflightRecords()` call and renamed test 
constants/builder fields to align with `SharePartition` parameter names (e.g. 
`MAX_IN_FLIGHT_RECORDS` → `DEFAULT_MAX_IN_FLIGHT_RECORDS`, `MAX_DELIVERY_COUNT` 
→ `DEFAULT_MAX_DELIVERY_COUNT`). 
     Also changed from short to int to match `sharePartitionMaxRecordLocks()` 
return type. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to