josefk31 commented on code in PR #21028:
URL: https://github.com/apache/kafka/pull/21028#discussion_r2962583292
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +96,191 @@ private static void
testFetchResponseWithInvalidRecord(MemoryRecords records, in
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+ int expectedFetchMaxBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, electedLeader.id())
+ // Explicitly change the configuration here.
+ .withFetchMaxBytes(expectedFetchMaxBytes)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
+ FetchRequestData data = (FetchRequestData) fetchRequest.data();
+ assertEquals(expectedFetchMaxBytes, data.maxBytes());
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAllBatchesForLargeMax() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // Here we are effectively saying that there is no limit to the amount
of records to return.
+ var remoteMaxSizeBytes = Integer.MAX_VALUE;
+ var localMaxSizeBytes = 1;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ int offsetCount = 0;
+ int batchCount = 0;
+ while (iterator.hasNext()) {
+ var batch = iterator.next();
+ var recordsIterator = batch.iterator();
+ while (recordsIterator.hasNext()) {
+ var record = recordsIterator.next();
+ assertEquals(offsetCount, record.offset());
+ offsetCount++;
+ }
+ batchCount++;
+ }
+ assertEquals(2, batchCount);
+ assertEquals(6, offsetCount);
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAtLeastOneBatch() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // There are two batches with 3 records each. The first batch is
always at least larger than 1 byte.
+ // If remoteMaxSizeBytes = 1 then the MockLog will return exactly 1
batch.
+ // If localMaxSizeBytes is used then MockLog will return two batches
+ var remoteMaxSizeBytes = 1;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ var firstBatch = iterator.next();
+ assertEquals(0, firstBatch.baseOffset());
+ assertEquals(3, firstBatch.nextOffset());
+ assertFalse(
+ iterator.hasNext(),
+ String.format("Expected only a single batch to be fetched for
maxSize = %d", remoteMaxSizeBytes)
+ );
+ }
+
+ @Test
+ public void testFetchMaxBytesFromRemoteFetchUsed() throws Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ var remoteMaxSizeBytes = 115;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .appendToLog(epoch, List.of("c", "c", "c"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ // Invariant is that we will always return records in batches and will
include batches which "go over"
+ // the controller.quorum.fetch.max.bytes configuration.
+ assertTrue(
+ records.sizeInBytes() < remoteMaxSizeBytes * 2,
+ String.format(
+ "Expected records size (%d) < remoteMaxSizeBytes*2 (%d)",
+ records.sizeInBytes(),
+ remoteMaxSizeBytes * 2
+ )
+ );
Review Comment:
Hmmm. I think the implementation doesn't actually do this invariant. It will
always go a bit over the max because it's returning batches unless the batch is
exactly the same size as the `remoteMaxSizeBytes`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]