josefk31 commented on code in PR #21028:
URL: https://github.com/apache/kafka/pull/21028#discussion_r2962583292


##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +96,191 @@ private static void 
testFetchResponseWithInvalidRecord(MemoryRecords records, in
         assertEquals(oldLogEndOffset, context.log.endOffset().offset());
     }
 
+    @Test
+    void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+        int epoch = 2;
+        int localId = KafkaRaftClientTest.randomReplicaId();
+        ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+        ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, 
true);
+        int expectedFetchMaxBytes = 1024;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, electedLeader)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, electedLeader.id())
+            // Explicitly change the configuration here.
+            .withFetchMaxBytes(expectedFetchMaxBytes)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
OptionalLong.empty());
+        FetchRequestData data = (FetchRequestData) fetchRequest.data();
+        assertEquals(expectedFetchMaxBytes, data.maxBytes());
+    }
+
+    @Test
+    public void testFetchMaxBytesAlwaysReturnsAllBatchesForLargeMax() throws 
Exception {
+        var epoch = 2;
+        var id = KafkaRaftClientTest.randomReplicaId();
+        var localKey = KafkaRaftClientTest.replicaKey(id, true);
+        var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+        // Here we are effectively saying that there is no limit to the amount 
of records to return.
+        var remoteMaxSizeBytes = Integer.MAX_VALUE;
+        var localMaxSizeBytes = 1;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            localKey.id(),
+            localKey.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "a", "a"))
+            .appendToLog(epoch, List.of("b", "b", "b"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(localKey, remoteKey)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            .withFetchMaxBytes(localMaxSizeBytes)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // Send a fetch request with max bytes that are different from the 
configured value.
+        FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L, 
epoch, 500);
+        request.setMaxBytes(remoteMaxSizeBytes);
+        context.deliverRequest(request);
+
+        context.pollUntilResponse();
+        FetchResponseData.PartitionData partitionData = 
context.assertSentFetchPartitionResponse();
+        assertEquals(Errors.NONE.code(), partitionData.errorCode());
+        MemoryRecords records = (MemoryRecords) 
FetchResponse.recordsOrFail(partitionData);
+        var iterator = records.batchIterator();
+        int offsetCount = 0;
+        int batchCount = 0;
+        while (iterator.hasNext()) {
+            var batch = iterator.next();
+            var recordsIterator = batch.iterator();
+            while (recordsIterator.hasNext()) {
+                var record = recordsIterator.next();
+                assertEquals(offsetCount, record.offset());
+                offsetCount++;
+            }
+            batchCount++;
+        }
+        assertEquals(2, batchCount);
+        assertEquals(6, offsetCount);
+    }
+
+    @Test
+    public void testFetchMaxBytesAlwaysReturnsAtLeastOneBatch() throws 
Exception {
+        var epoch = 2;
+        var id = KafkaRaftClientTest.randomReplicaId();
+        var localKey = KafkaRaftClientTest.replicaKey(id, true);
+        var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+        // There are two batches with 3 records each. The first batch is 
always at least larger than 1 byte.
+        // If remoteMaxSizeBytes = 1 then the MockLog will return exactly 1 
batch.
+        // If localMaxSizeBytes is used then MockLog will return two batches
+        var remoteMaxSizeBytes = 1;
+        var localMaxSizeBytes = 1024;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+                localKey.id(),
+                localKey.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "a", "a"))
+            .appendToLog(epoch, List.of("b", "b", "b"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(localKey, remoteKey)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            .withFetchMaxBytes(localMaxSizeBytes)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // Send a fetch request with max bytes that are different from the 
configured value.
+        FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L, 
epoch, 500);
+        request.setMaxBytes(remoteMaxSizeBytes);
+        context.deliverRequest(request);
+
+        context.pollUntilResponse();
+        FetchResponseData.PartitionData partitionData = 
context.assertSentFetchPartitionResponse();
+        assertEquals(Errors.NONE.code(), partitionData.errorCode());
+        MemoryRecords records = (MemoryRecords) 
FetchResponse.recordsOrFail(partitionData);
+        var iterator = records.batchIterator();
+        var firstBatch = iterator.next();
+        assertEquals(0, firstBatch.baseOffset());
+        assertEquals(3, firstBatch.nextOffset());
+        assertFalse(
+            iterator.hasNext(),
+            String.format("Expected only a single batch to be fetched for 
maxSize = %d", remoteMaxSizeBytes)
+        );
+    }
+
+    @Test
+    public void testFetchMaxBytesFromRemoteFetchUsed() throws Exception {
+        var epoch = 2;
+        var id = KafkaRaftClientTest.randomReplicaId();
+        var localKey = KafkaRaftClientTest.replicaKey(id, true);
+        var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+        var remoteMaxSizeBytes = 115;
+        var localMaxSizeBytes = 1024;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            localKey.id(),
+            localKey.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "a", "a"))
+            .appendToLog(epoch, List.of("b", "b", "b"))
+            .appendToLog(epoch, List.of("c", "c", "c"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(localKey, remoteKey)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            .withFetchMaxBytes(localMaxSizeBytes)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // Send a fetch request with max bytes that are different from the 
configured value.
+        FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L, 
epoch, 500);
+        request.setMaxBytes(remoteMaxSizeBytes);
+        context.deliverRequest(request);
+
+        context.pollUntilResponse();
+        FetchResponseData.PartitionData partitionData = 
context.assertSentFetchPartitionResponse();
+        assertEquals(Errors.NONE.code(), partitionData.errorCode());
+        MemoryRecords records = (MemoryRecords) 
FetchResponse.recordsOrFail(partitionData);
+        // Invariant is that we will always return records in batches and will 
include batches which "go over"
+        // the controller.quorum.fetch.max.bytes configuration.
+        assertTrue(
+            records.sizeInBytes() < remoteMaxSizeBytes * 2,
+            String.format(
+                "Expected records size (%d) < remoteMaxSizeBytes*2 (%d)",
+                records.sizeInBytes(),
+                remoteMaxSizeBytes * 2
+            )
+        );

Review Comment:
   Hmmm. I think the implementation doesn't actually do this invariant. It will 
always go a bit over the max because it's returning batches unless the batch is 
exactly the same size as the `remoteMaxSizeBytes`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to