apoorvmittal10 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2546730167
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1866,6 +1888,8 @@ private int acquireSubsetBatchRecords(
) {
lock.writeLock().lock();
int acquiredCount = 0;
+ long maxFetchRecordsWhileBadRecord = -1;
Review Comment:
```suggestion
long maxFetchRecordsWhileThrottledRecords = -1;
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1909,27 @@ private int acquireSubsetBatchRecords(
continue;
}
- InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
+ int recordDeliveryCount =
offsetState.getValue().deliveryCount();
+ // On last delivery attempt, submit acquired records,
+ // bad record will be delivered alone next time
+ if (maxDeliveryCount > 2 && recordDeliveryCount ==
maxDeliveryCount - 1 && acquiredCount > 0) {
+ hasBadRecord = true;
+ break;
+ }
+
+ // When record delivery count reach the throttle threshold,
progressively reduce batch size to isolate bad records.
Review Comment:
I won't prefer to use `bad` as we should just talk about the records which
have approached the delivery limit and we send individual records.
```suggestion
// When record delivery count reach the throttle threshold,
progressively reduce batch size to isolate records.
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1909,27 @@ private int acquireSubsetBatchRecords(
continue;
}
- InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
+ int recordDeliveryCount =
offsetState.getValue().deliveryCount();
+ // On last delivery attempt, submit acquired records,
+ // bad record will be delivered alone next time
Review Comment:
```suggestion
// If the record is on last delivery attempt then isolate
that record to be delivered alone.
// If the respective record is corrupt then it prevents
increasing delivery count of multiple
// records in a single response batch. Condition below
checks if the current record has reached
// the delivery limit and already have some records to
return in response then skip processing
// the current record, which shall be delivered alone in
next fetch.
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -858,8 +872,16 @@ public ShareAcquiredRecords acquire(
// In record_limit mode, we need to ensure that we do not
acquire more than
// maxRecordsToAcquire. Hence, pass the remaining number
of records that can
// be acquired.
- int acquiredSubsetCount =
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining,
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
- acquiredCount += acquiredSubsetCount;
+ BadRecordMarkerAndAcquiredCount
badRecordMarkerAndAcquiredCount = acquireSubsetBatchRecords(memberId,
isRecordLimitMode,
+ numRecordsRemaining, firstBatch.baseOffset(),
lastOffsetToAcquire, inFlightBatch, result);
+
+ acquiredCount +=
badRecordMarkerAndAcquiredCount.acquiredCount();
+ // If a bad record is present, return immediately and set
`maxRecordsToAcquire = 0`
+ // to prevent acquiring any new records afterwards.
Review Comment:
```suggestion
// If records are throttled, return immediately and set
`maxRecordsToAcquire = 0`
// to prevent acquiring any new records afterwards.
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -858,8 +871,16 @@ public ShareAcquiredRecords acquire(
// In record_limit mode, we need to ensure that we do not
acquire more than
// maxRecordsToAcquire. Hence, pass the remaining number
of records that can
// be acquired.
- int acquiredSubsetCount =
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining,
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
- acquiredCount += acquiredSubsetCount;
+ BadRecordMarkerAndAcquiredCount
badRecordMarkerAndAcquiredCount = acquireSubsetBatchRecords(memberId,
isRecordLimitMode,
+ numRecordsRemaining, firstBatch.baseOffset(),
lastOffsetToAcquire, inFlightBatch, result);
+
+ acquiredCount +=
badRecordMarkerAndAcquiredCount.acquiredCount();
+ // If a bad record is present, return immediately and set
`maxRecordsToAcquire = -1`
+ // to prevent acquiring any new records afterwards.
+ if (badRecordMarkerAndAcquiredCount.badRecordMarker()) {
Review Comment:
Correct but in the example presented ideally we should not return true from
`shouldThrottleRecordsDelivery`. So if we change our condition in method
`shouldThrottleRecordsDelivery` to below then we can avoid same:
```
inFlightBatch.offsetState().values().stream().filter(offsetState ->
offsetState.state() ==
RecordState.AVAILABLE).mapToInt(InFlightState::deliveryCount).max().orElse(0)
>= throttleRecordsDeliveryLimit;
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1908,21 @@ private int acquireSubsetBatchRecords(
continue;
}
- InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
+ int recordDeliveryCount =
offsetState.getValue().deliveryCount();
+ // On last delivery attempt, submit acquired records,
+ // bad record will be delivered alone next time
+ if (maxDeliveryCount > 2 && recordDeliveryCount ==
maxDeliveryCount - 1 && acquiredCount > 0) {
+ hasBadRecord = true;
+ break;
+ }
Review Comment:
Though you are right with the isolation and it will be in line with the
checks you have for batches, where also we are checking twice, before and
after. However for offsets this should not be the case, ideally, unless some
offsets are specifically released. Having said that, I think let's keep the way
you have implemented to isolate single records on approaching the delivery
limits.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1866,6 +1888,8 @@ private int acquireSubsetBatchRecords(
) {
lock.writeLock().lock();
int acquiredCount = 0;
+ long maxFetchRecordsWhileBadRecord = -1;
+ boolean hasBadRecord = false;
Review Comment:
```suggestion
boolean hasThrottledRecord = false;
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -362,6 +367,7 @@ enum SharePartitionState {
this.leaderEpoch = leaderEpoch;
this.maxInFlightRecords = maxInFlightRecords;
this.maxDeliveryCount = maxDeliveryCount;
+ this.throttleRecordsDeliveryLimit = Math.max(2, (int)
Math.ceil((double) maxDeliveryCount / 2));
Review Comment:
Define this as a constant and use elsewhere
`MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT`.
```suggestion
this.throttleRecordsDeliveryLimit =
Math.max(MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT, (int) Math.ceil((double)
maxDeliveryCount / 2));
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -834,7 +840,15 @@ public ShareAcquiredRecords acquire(
boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastOffsetToAcquire);
int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
boolean recordLimitSubsetMatch = isRecordLimitMode &&
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire,
acquiredCount);
- if (!fullMatch || inFlightBatch.offsetState() != null ||
recordLimitSubsetMatch) {
+ boolean throttleRecordsDelivery =
shouldThrottleRecordsDelivery(inFlightBatch);
+ // Stop acquiring more records if records delivery has to be
throttled. The throttling prevents
+ // complete batch to be archived in case of a single record
being corrupt.
Review Comment:
```suggestion
// Stop acquiring more records if records delivery has to be
throttled. The throttling prevents
// complete batch to be archived in case of a single record
being corrupt.
// Below check isolates the current batch/offsets to be
delivered individually in subsequent fetches.
```
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
}
+ @Test
+ public void testSkipBadRecordWhenPendingDeliveriesExist() {
Review Comment:
```suggestion
public void testThrottleRecordsWhenPendingDeliveriesExist() {
```
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1116,9 +1116,9 @@ public void testMaybeInitializeAndAcquire() {
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionAllData(0, 3, 10L,
Errors.NONE.code(), Errors.NONE.message(),
List.of(
- new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 0),
Review Comment:
Why to change it from `2`? I understand changing below 30L offset from 3
else that would have triggered the throttling.
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
}
+ @Test
+ public void testSkipBadRecordWhenPendingDeliveriesExist() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 19L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(3, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(30, sharePartition.endOffset());
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 7, 13).close();
+ memoryRecordsBuilder(buffer, 20, 8).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ // Set max fetch records to 500, records should be acquired till the
offset 26 of the fetched batch.
+ // 16 records should be returned: 7-19, 23-25
+ // The record at offset 26 has a delivery count of 3 and is a bad
record; it should be skipped.
Review Comment:
```suggestion
// The record at offset 26 has a delivery count of 3 and is a
subject to be throttled; it should be skipped.
```
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
}
+ @Test
+ public void testSkipBadRecordWhenPendingDeliveriesExist() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 19L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(3, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(30, sharePartition.endOffset());
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 7, 13).close();
+ memoryRecordsBuilder(buffer, 20, 8).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ // Set max fetch records to 500, records should be acquired till the
offset 26 of the fetched batch.
+ // 16 records should be returned: 7-19, 23-25
+ // The record at offset 26 has a delivery count of 3 and is a bad
record; it should be skipped.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 16);
+
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(7, 14, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 19, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertEquals(26, sharePartition.nextFetchOffset());
+ assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+ assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(23L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
+ assertEquals(30L, sharePartition.endOffset());
+ assertEquals(3, sharePartition.deliveryCompleteCount());
+ }
+
+ @Test
+ public void
testAcquireRecordsHalvesBatchSizeOnEachFailureUntilSingleRecordOnLastAttempt()
throws InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 34L,
RecordState.AVAILABLE.id, (short) 4)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withMaxDeliveryCount(7)
+
.withDefaultAcquisitionLockTimeoutMs(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS)
+ .build();
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(34, sharePartition.endOffset());
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 15, 20).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ // The record at offset 15 has a delivery count of 4 and is a bad
record
+ // First acquisition attempt fails: batch size should be halved (20 ->
10)
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 10);
+
+ // Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS +
1);
+
+ // Second failure: batch size halved again (now ~1/4 of original, 20
-> 5)
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 5);
+
+ // Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS +
1);
+
+ // Final delivery attempt: only the suspected bad record should be
acquired
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 1);
+ }
Review Comment:
We do not have any tests for the scenarios mentioned here:
https://github.com/apache/kafka/pull/20837/files#r2544029799
and https://github.com/apache/kafka/pull/20837/files#r2543997891
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
}
+ @Test
+ public void testSkipBadRecordWhenPendingDeliveriesExist() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 19L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(3, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(30, sharePartition.endOffset());
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 7, 13).close();
+ memoryRecordsBuilder(buffer, 20, 8).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ // Set max fetch records to 500, records should be acquired till the
offset 26 of the fetched batch.
+ // 16 records should be returned: 7-19, 23-25
+ // The record at offset 26 has a delivery count of 3 and is a bad
record; it should be skipped.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 16);
+
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(7, 14, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 19, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertEquals(26, sharePartition.nextFetchOffset());
+ assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+ assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(23L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
+ assertEquals(30L, sharePartition.endOffset());
+ assertEquals(3, sharePartition.deliveryCompleteCount());
Review Comment:
Can we refetch with the same buffer by calling `fetchAcquiredRecords` and
check both offset 26,27 will be delivered?
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1239,9 +1239,9 @@ public void
testMaybeInitializeAndAcquireWithHigherMaxFetchRecords() {
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionAllData(0, 3, 10L,
Errors.NONE.code(), Errors.NONE.message(),
List.of(
- new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 0),
Review Comment:
Same as above.
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -11191,6 +11191,144 @@ public void
testLsoMovementWithPendingAcknowledgementsForBatches() throws Interr
Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
}
+ @Test
+ public void testSkipBadRecordWhenPendingDeliveriesExist() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 19L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(3, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(30, sharePartition.endOffset());
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 7, 13).close();
+ memoryRecordsBuilder(buffer, 20, 8).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ // Set max fetch records to 500, records should be acquired till the
offset 26 of the fetched batch.
+ // 16 records should be returned: 7-19, 23-25
+ // The record at offset 26 has a delivery count of 3 and is a bad
record; it should be skipped.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 16);
+
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(7, 14, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 19, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertEquals(26, sharePartition.nextFetchOffset());
+ assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+ assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(23L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
+ assertEquals(30L, sharePartition.endOffset());
+ assertEquals(3, sharePartition.deliveryCompleteCount());
+ }
+
+ @Test
+ public void
testAcquireRecordsHalvesBatchSizeOnEachFailureUntilSingleRecordOnLastAttempt()
throws InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 34L,
RecordState.AVAILABLE.id, (short) 4)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withMaxDeliveryCount(7)
+
.withDefaultAcquisitionLockTimeoutMs(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS)
+ .build();
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(34, sharePartition.endOffset());
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 15, 20).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ // The record at offset 15 has a delivery count of 4 and is a bad
record
+ // First acquisition attempt fails: batch size should be halved (20 ->
10)
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 10);
+
+ // Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS +
1);
+
+ // Second failure: batch size halved again (now ~1/4 of original, 20
-> 5)
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 5);
+
+ // Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS +
1);
+
+ // Final delivery attempt: only the suspected bad record should be
acquired
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 1);
Review Comment:
Can we call this in a loop of 10 to verify all next set of records are
delivered individually. Verify the acquired offset returned as well. And later
verify the other 10 remaining ones are delivered correctly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]