apoorvmittal10 commented on code in PR #20395: URL: https://github.com/apache/kafka/pull/20395#discussion_r2298978810
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1088,6 +1088,672 @@ public void testMaybeInitializeStateBatchesWithoutGaps() { assertNull(initialReadGapOffset); } + @Test + public void testMaybeInitializeAndAcquire() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. + // The records in the batch are from 10 to 49. + MemoryRecords records = memoryRecords(40, 10); + // Set max fetch records to 1, records will be acquired till the first gap is encountered. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 1, + DEFAULT_FETCH_OFFSET, Review Comment: Yeah good to set to the one where fetch happened, done. ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1088,6 +1088,672 @@ public void testMaybeInitializeStateBatchesWithoutGaps() { assertNull(initialReadGapOffset); } + @Test + public void testMaybeInitializeAndAcquire() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. + // The records in the batch are from 10 to 49. + MemoryRecords records = memoryRecords(40, 10); + // Set max fetch records to 1, records will be acquired till the first gap is encountered. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 1, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 5); + + assertArrayEquals(expectedAcquiredRecord(10, 14, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + assertEquals(4, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.cachedState().get(10L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(10L).offsetState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(15L, sharePartition.initialReadGapOffset().gapStartOffset()); + + // Send the same batch again to acquire the next set of records. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 10, + DEFAULT_FETCH_OFFSET, Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org