chirag-wadhwa5 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1940546264


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -896,6 +896,170 @@ public void testMaybeInitializeWithReadException() {
         assertThrows(RuntimeException.class, sharePartition2::maybeInitialize);
     }
 
+    @Test
+    public void testMaybeInitializeStateBatchesWithGapAtBeginning() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ARCHIVED.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(2, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(15L));
+        assertNotNull(sharePartition.cachedState().get(21L));
+
+        assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+
+        assertEquals(30, sharePartition.cachedState().get(21L).lastOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(21L).batchState());
+        assertEquals(3, 
sharePartition.cachedState().get(21L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(21L).offsetState());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(10, initialReadGapOffset.gapStartOffset());
+        assertEquals(30, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeStateBatchesWithMultipleGaps() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+                        new PersisterStateBatch(30L, 40L, 
RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(2, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(15L));
+        assertNotNull(sharePartition.cachedState().get(30L));
+
+        assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+
+        assertEquals(40, sharePartition.cachedState().get(30L).lastOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(30L).batchState());
+        assertEquals(3, 
sharePartition.cachedState().get(30L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(30L).offsetState());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(10, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeStateBatchesWithGapNotAtBeginning() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionAllData(0, 3, 15L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2),
+                        new PersisterStateBatch(30L, 40L, 
RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(15, sharePartition.startOffset());

Review Comment:
   Thanks for the review. Yes, that will be better. There might be situations 
where these un-removed batches in the cache create problems with the max in 
flight records limit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to