AndrewJSchofield commented on code in PR #20395: URL: https://github.com/apache/kafka/pull/20395#discussion_r2293652364
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1088,6 +1088,672 @@ public void testMaybeInitializeStateBatchesWithoutGaps() { assertNull(initialReadGapOffset); } + @Test + public void testMaybeInitializeAndAcquire() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. + // The records in the batch are from 10 to 49. + MemoryRecords records = memoryRecords(40, 10); Review Comment: nit: One thing that makes this a bit harder than necessary to review is the inconsistency in the conventions about the offset ranges. For example, this could read `memoryRecords(10,49)` which would align with the `firstOffset, lastOffset` convention used in the persister. Not something that needs to be fixed on this PR, but potentially something to refactor later on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org