junrao commented on code in PR #18804:
URL: https://github.com/apache/kafka/pull/18804#discussion_r1945434837
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -187,4 +185,78 @@ static Partition partition(ReplicaManager replicaManager,
TopicPartition tp) {
}
return partition;
}
+
+ /**
+ * Slice the fetch records based on the acquired records. The slicing is
done based on the first
+ * and last offset of the acquired records from the list. The slicing
doesn't consider individual
+ * acquired batches rather the boundaries of the acquired list.
+ *
+ * @param records The records to be sliced.
+ * @param shareAcquiredRecords The share acquired records containing the
non-empty acquired records.
+ * @return The sliced records, if the records are of type FileRecords and
the acquired records are a subset
+ * of the fetched records. Otherwise, the original records are
returned.
+ */
+ static Records maybeSliceFetchRecords(Records records,
ShareAcquiredRecords shareAcquiredRecords) {
+ if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof
FileRecords fileRecords)) {
+ return records;
+ }
+ // The acquired records should be non-empty, do not check as the
method is called only when the
+ // acquired records are non-empty.
+ List<AcquiredRecords> acquiredRecords =
shareAcquiredRecords.acquiredRecords();
+ try {
+ final long firstAcquiredOffset =
acquiredRecords.get(0).firstOffset();
+ final long lastAcquiredOffset =
acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
+ int startPosition = 0;
+ int size = 0;
+ // Track the previous batch to adjust the start position in case
the first acquired offset
+ // is between the batch.
+ FileChannelRecordBatch previousBatch = null;
+ for (FileChannelRecordBatch batch : fileRecords.batches()) {
+ // If the batch base offset is less than the first acquired
offset, then the start position
+ // should be updated to skip the batch.
+ if (batch.baseOffset() < firstAcquiredOffset) {
Review Comment:
Hmm, not sure why we need to maintain `previousBatch` below. Could set just
set `startPosition` when `batch.lastOffset()` is >= `firstAcquiredOffset` for
the first time?
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -678,6 +680,9 @@ public ShareAcquiredRecords acquire(
for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
// If the acquired count is equal to the max fetch records
then break the loop.
if (acquiredCount >= maxFetchRecords) {
+ // If the limit to acquire records is reached then it
means there exists additional
+ // fetch batches which cannot be acquired.
+ subsetAcquired = true;
Review Comment:
It's a bit messy to maintain this. Do we really need this since we can find
this out by iterating the batches, which is relatively cheap?
##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -187,4 +185,78 @@ static Partition partition(ReplicaManager replicaManager,
TopicPartition tp) {
}
return partition;
}
+
+ /**
+ * Slice the fetch records based on the acquired records. The slicing is
done based on the first
+ * and last offset of the acquired records from the list. The slicing
doesn't consider individual
+ * acquired batches rather the boundaries of the acquired list.
+ *
+ * @param records The records to be sliced.
+ * @param shareAcquiredRecords The share acquired records containing the
non-empty acquired records.
+ * @return The sliced records, if the records are of type FileRecords and
the acquired records are a subset
+ * of the fetched records. Otherwise, the original records are
returned.
+ */
+ static Records maybeSliceFetchRecords(Records records,
ShareAcquiredRecords shareAcquiredRecords) {
+ if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof
FileRecords fileRecords)) {
Review Comment:
Hmm, with remote storage, it's possible for records to be of MemoryRecords.
It would be useful to slice it too.
##########
core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java:
##########
@@ -449,4 +468,202 @@ tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE,
0L, 0L,
Mockito.verify(exceptionHandler, times(1)).accept(new
SharePartitionKey("grp", tp0), exception);
Mockito.verify(sp0, times(0)).updateCacheAndOffsets(any(Long.class));
}
+
+ @Test
+ public void testMaybeSliceFetchRecordsSingleBatch() throws IOException {
+ // Create 1 batch of records with 10 records.
+ FileRecords records = createFileRecords(Map.of(5L, 10));
+
+ // Acquire all offsets, should return all batches.
+ List<AcquiredRecords> acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(5).setLastOffset(14).setDeliveryCount((short)
1));
+ Records slicedRecords =
ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 10, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire offsets out of first offset bound should return the records
for the matching batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(2).setLastOffset(14).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 10, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire offsets out of last offset bound should return the records
for the matching batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(5).setLastOffset(20).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 5, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire only subset of batch offsets, starting from the first
offset.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire only subset of batch offsets, ending at the last offset.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(8).setLastOffset(14).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire only subset of batch offsets, in between the batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(8).setLastOffset(10).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire subsets out of base offset bounds should return empty
records.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(2).setLastOffset(4).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(0, slicedRecords.sizeInBytes());
+
+ // Acquire subsets out of last offset bounds should return empty
records.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(15).setLastOffset(18).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(0, slicedRecords.sizeInBytes());
+ }
+
+ @Test
+ public void testMaybeSliceFetchRecordsMultipleBatches() throws IOException
{
+ // Create 3 batches of records with 3, 2 and 4 records respectively.
+ LinkedHashMap<Long, Integer> offsetValues = new LinkedHashMap<>();
+ offsetValues.put(0L, 3);
+ offsetValues.put(3L, 2);
+ offsetValues.put(7L, 4); // Gap of 2 offsets between batches.
+ FileRecords records = createFileRecords(offsetValues);
+
+ // Acquire all offsets, should return all batches.
+ List<AcquiredRecords> acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(0).setLastOffset(10).setDeliveryCount((short)
1));
+ Records slicedRecords =
ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 11, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire offsets from all batches, but only first record from last
batch. Should return
+ // all batches.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 5, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire only first batch offsets, should return only first batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(0).setLastOffset(2).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 5, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ List<RecordBatch> recordBatches =
TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(0, recordBatches.get(0).baseOffset());
+ assertEquals(2, recordBatches.get(0).lastOffset());
+
+ // Acquire only second batch offsets, should return only second batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(3).setLastOffset(4).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 5, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(3, recordBatches.get(0).baseOffset());
+ assertEquals(4, recordBatches.get(0).lastOffset());
+
+ // Acquire only last batch offsets, should return only last batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(7).setLastOffset(10).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(7, recordBatches.get(0).baseOffset());
+ assertEquals(10, recordBatches.get(0).lastOffset());
+
+ // Acquire only subset of first batch offsets, should return only
first batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(0, recordBatches.get(0).baseOffset());
+ assertEquals(2, recordBatches.get(0).lastOffset());
+
+ // Acquire only subset of second batch offsets, should return only
second batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(3, recordBatches.get(0).baseOffset());
+ assertEquals(4, recordBatches.get(0).lastOffset());
+
+ // Acquire only subset of last batch offsets, should return only last
batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(8).setLastOffset(8).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(7, recordBatches.get(0).baseOffset());
+ assertEquals(10, recordBatches.get(0).lastOffset());
+
+ // Acquire including gaps between batches, should return 2 batches.
Review Comment:
Hmm, there are no gaps btw batches, right?
##########
core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java:
##########
@@ -369,49 +379,58 @@ public void testProcessFetchResponseWithMaxFetchRecords()
{
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
memberId.toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 10,
BROKER_TOPIC_STATS);
- MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE,
- new SimpleRecord("0".getBytes(), "v".getBytes()),
- new SimpleRecord("1".getBytes(), "v".getBytes()),
- new SimpleRecord("2".getBytes(), "v".getBytes()),
- new SimpleRecord(null, "value".getBytes()));
+ LinkedHashMap<Long, Integer> offsetValues = new LinkedHashMap<>();
+ offsetValues.put(0L, 1);
+ offsetValues.put(1L, 1);
+ offsetValues.put(2L, 1);
+ offsetValues.put(3L, 1);
+ Records records1 = createFileRecords(offsetValues);
+
+ offsetValues.clear();
+ offsetValues.put(100L, 4);
+ Records records2 = createFileRecords(offsetValues);
FetchPartitionData fetchPartitionData1 = new
FetchPartitionData(Errors.NONE, 0L, 0L,
records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false);
FetchPartitionData fetchPartitionData2 = new
FetchPartitionData(Errors.NONE, 0L, 0L,
- records1, Optional.empty(), OptionalLong.empty(), Optional.empty(),
+ records2, Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false);
when(sp0.acquire(memberId.toString(), BATCH_SIZE, 10,
fetchPartitionData1)).thenReturn(
- ShareAcquiredRecords.fromAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
- .setFirstOffset(0).setLastOffset(1).setDeliveryCount((short)
1)));
+ createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
+ .setFirstOffset(0).setLastOffset(1).setDeliveryCount((short)
1), true));
when(sp1.acquire(memberId.toString(), BATCH_SIZE, 8,
fetchPartitionData2)).thenReturn(
- ShareAcquiredRecords.fromAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
-
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)));
+ createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords()
+
.setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1), true));
// Send the topic partitions in order so can validate if correct mock
is called, accounting
// the offset count for the acquired records from the previous share
partition acquire.
Map<TopicIdPartition, FetchPartitionData> responseData1 = new
LinkedHashMap<>();
responseData1.put(tp0, fetchPartitionData1);
responseData1.put(tp1, fetchPartitionData2);
- Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
resultData1 =
+ Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
=
Review Comment:
responseData1 => responseData ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]