apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1638137205


##########
core/src/test/java/kafka/server/SharePartitionTest.java:
##########
@@ -63,4 +111,172 @@ public void testRecordStateForId() {
         // Invalid check.
         assertThrows(IllegalArgumentException.class, () -> 
RecordState.forId((byte) 5));
     }
+
+    @Test
+    public void testAcquireSingleRecord() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+        MemoryRecords records = memoryRecords(1);
+
+        CompletableFuture<List<AcquiredRecords>> result = 
sharePartition.acquire(
+            MEMBER_ID,
+            new FetchPartitionData(Errors.NONE, 3, 0, records,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+        assertFalse(result.isCompletedExceptionally());
+
+        List<AcquiredRecords> acquiredRecordsList = result.join();
+        assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(1, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+        assertEquals(0, sharePartition.cachedState().get(0L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(0L).batchMemberId());
+        assertEquals(1, 
sharePartition.cachedState().get(0L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(0L).offsetState());
+    }
+
+    @Test
+    public void testAcquireMultipleRecords() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+        MemoryRecords records = memoryRecords(5, 10);
+
+        CompletableFuture<List<AcquiredRecords>> result = 
sharePartition.acquire(
+            MEMBER_ID,
+            new FetchPartitionData(Errors.NONE, 20, 3, records,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+        assertFalse(result.isCompletedExceptionally());
+
+        List<AcquiredRecords> acquiredRecordsList = result.join();
+        assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(10L).batchMemberId());
+        assertEquals(1, 
sharePartition.cachedState().get(10L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(10L).offsetState());
+    }
+
+    @Test
+    public void testAcquireMultipleRecordsWithOverlapAndNewBatch() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
+        MemoryRecords records = memoryRecords(5, 0);
+
+        CompletableFuture<List<AcquiredRecords>> result = 
sharePartition.acquire(
+            MEMBER_ID,
+            new FetchPartitionData(Errors.NONE, 20, 3, records,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+        assertFalse(result.isCompletedExceptionally());
+
+        List<AcquiredRecords> acquiredRecordsList = result.join();
+        assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        // Add records from 0-9 offsets, 5-9 should be acquired and 0-4 should 
be ignored.
+        records = memoryRecords(10, 0);
+        result = sharePartition.acquire(
+            MEMBER_ID,
+            new FetchPartitionData(Errors.NONE, 20, 3, records,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false));
+        assertFalse(result.isCompletedExceptionally());
+        acquiredRecordsList = result.join();
+        assertArrayEquals(expectedAcquiredRecords(memoryRecords(5, 5), 
1).toArray(), acquiredRecordsList.toArray());

Review Comment:
   Subset batch triggers when there is different state for batch is being 
tracked, I ll be `acknowledge` functionality in the next PR and will add 
additional tests along.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to