AndrewJSchofield commented on code in PR #20395:
URL: https://github.com/apache/kafka/pull/20395#discussion_r2293652364


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1088,6 +1088,672 @@ public void 
testMaybeInitializeStateBatchesWithoutGaps() {
         assertNull(initialReadGapOffset);
     }
 
+    @Test
+    public void testMaybeInitializeAndAcquire() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create a single batch record that covers the entire range from 10 
to 30 of initial read gap.
+        // The records in the batch are from 10 to 49.
+        MemoryRecords records = memoryRecords(40, 10);

Review Comment:
   nit: One thing that makes this a bit harder than necessary to review is the 
inconsistency in the conventions about the offset ranges. For example, this 
could read `memoryRecords(10,49)` which would align with the `firstOffset, 
lastOffset` convention used in the persister. Not something that needs to be 
fixed on this PR, but potentially something to refactor later on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to