apoorvmittal10 commented on code in PR #16274: URL: https://github.com/apache/kafka/pull/16274#discussion_r1638137205
########## core/src/test/java/kafka/server/SharePartitionTest.java: ########## @@ -63,4 +111,172 @@ public void testRecordStateForId() { // Invalid check. assertThrows(IllegalArgumentException.class, () -> RecordState.forId((byte) 5)); } + + @Test + public void testAcquireSingleRecord() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records = memoryRecords(1); + + CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + List<AcquiredRecords> acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(1, sharePartition.nextFetchOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertEquals(0, sharePartition.cachedState().get(0L).firstOffset()); + assertEquals(0, sharePartition.cachedState().get(0L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState()); + assertEquals(MEMBER_ID, sharePartition.cachedState().get(0L).batchMemberId()); + assertEquals(1, sharePartition.cachedState().get(0L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(0L).offsetState()); + } + + @Test + public void testAcquireMultipleRecords() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records = memoryRecords(5, 10); + + CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 3, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + List<AcquiredRecords> acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.cachedState().get(10L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(MEMBER_ID, sharePartition.cachedState().get(10L).batchMemberId()); + assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(10L).offsetState()); + } + + @Test + public void testAcquireMultipleRecordsWithOverlapAndNewBatch() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records = memoryRecords(5, 0); + + CompletableFuture<List<AcquiredRecords>> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 3, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + List<AcquiredRecords> acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(5, sharePartition.nextFetchOffset()); + + // Add records from 0-9 offsets, 5-9 should be acquired and 0-4 should be ignored. + records = memoryRecords(10, 0); + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 3, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(memoryRecords(5, 5), 1).toArray(), acquiredRecordsList.toArray()); Review Comment: Subset batch triggers when there is different state for batch is being tracked, I ll be `acknowledge` functionality in the next PR and will add additional tests along. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org