chirag-wadhwa5 commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1940543756
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -896,6 +896,170 @@ public void testMaybeInitializeWithReadException() { assertThrows(RuntimeException.class, sharePartition2::maybeInitialize); } + @Test + public void testMaybeInitializeStateBatchesWithGapAtBeginning() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(15L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14 + new PersisterStateBatch(21L, 30L, RecordState.ARCHIVED.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(2, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(15L)); + assertNotNull(sharePartition.cachedState().get(21L)); + + assertEquals(20, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(2, sharePartition.cachedState().get(15L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + + assertEquals(30, sharePartition.cachedState().get(21L).lastOffset()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState()); + assertEquals(3, sharePartition.cachedState().get(21L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(21L).offsetState()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + assertEquals(10, initialReadGapOffset.gapStartOffset()); + assertEquals(30, initialReadGapOffset.endOffset()); + } + + @Test + public void testMaybeInitializeStateBatchesWithMultipleGaps() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(15L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14 + new PersisterStateBatch(30L, 40L, RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29 + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(2, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(15L)); + assertNotNull(sharePartition.cachedState().get(30L)); + + assertEquals(20, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(2, sharePartition.cachedState().get(15L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + + assertEquals(40, sharePartition.cachedState().get(30L).lastOffset()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(30L).batchState()); + assertEquals(3, sharePartition.cachedState().get(30L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(30L).offsetState()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + assertEquals(10, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testMaybeInitializeStateBatchesWithGapNotAtBeginning() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 15L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(15L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), + new PersisterStateBatch(30L, 40L, RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29 + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(15, sharePartition.startOffset()); Review Comment: Sorry, missed this line. Added another test case for this ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1791,10 +1875,23 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). long firstKeyToRemove = cachedState.firstKey(); long lastKeyToRemove; NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged); + // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { startOffset = cachedState.higherKey(lastOffsetAcknowledged); + if (isInitialReadGapOffsetWindowActive()) { + // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. + // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and all these batches are acked. + // There is a gap from 21 to 30. Let the initialReadGapOffset.gapStartOffset be 21. In this case, + // lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset + // of next cachedState batch (next cachedState batch is 31 to 40). There is an acquirable gap in between (21 to 30) + // and The startOffset should be at 21. Hence, we set startOffset to the minimum of initialReadGapOffset.gapStartOffset + // and higher key of lastOffsetAcknowledged + startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset); Review Comment: Sorry, missed this line. Added another test case for this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org