chirag-wadhwa5 commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1940546264
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -896,6 +896,170 @@ public void testMaybeInitializeWithReadException() { assertThrows(RuntimeException.class, sharePartition2::maybeInitialize); } + @Test + public void testMaybeInitializeStateBatchesWithGapAtBeginning() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(15L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14 + new PersisterStateBatch(21L, 30L, RecordState.ARCHIVED.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(2, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(15L)); + assertNotNull(sharePartition.cachedState().get(21L)); + + assertEquals(20, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(2, sharePartition.cachedState().get(15L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + + assertEquals(30, sharePartition.cachedState().get(21L).lastOffset()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState()); + assertEquals(3, sharePartition.cachedState().get(21L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(21L).offsetState()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + assertEquals(10, initialReadGapOffset.gapStartOffset()); + assertEquals(30, initialReadGapOffset.endOffset()); + } + + @Test + public void testMaybeInitializeStateBatchesWithMultipleGaps() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(15L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14 + new PersisterStateBatch(30L, 40L, RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29 + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(2, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(15L)); + assertNotNull(sharePartition.cachedState().get(30L)); + + assertEquals(20, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(2, sharePartition.cachedState().get(15L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + + assertEquals(40, sharePartition.cachedState().get(30L).lastOffset()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(30L).batchState()); + assertEquals(3, sharePartition.cachedState().get(30L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(30L).offsetState()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + assertEquals(10, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testMaybeInitializeStateBatchesWithGapNotAtBeginning() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 15L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(15L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), + new PersisterStateBatch(30L, 40L, RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29 + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(15, sharePartition.startOffset()); Review Comment: Thanks for the review. Yes, that will be better. There might be situations where these un-removed batches in the cache create problems with the max in flight records limit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org