josefk31 commented on code in PR #21028:
URL: https://github.com/apache/kafka/pull/21028#discussion_r2706191639


##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java:
##########
@@ -118,14 +118,22 @@ UnifiedLog log() {
     }
 
     @Override
-    public LogFetchInfo read(long startOffset, Isolation readIsolation) {
+    public int defaultLocalReadMaxRecordsSizeBytes() {
+        return config.internalMaxFetchSizeInBytes();
+    }
+
+    @Override
+    public LogFetchInfo read(long startOffset, Isolation readIsolation, int 
maxRecordSizeBytes) {

Review Comment:
   Done!



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +93,75 @@ private static void 
testFetchResponseWithInvalidRecord(MemoryRecords records, in
         assertEquals(oldLogEndOffset, context.log.endOffset().offset());
     }
 
+    @Test
+    void testFetchRequestObeysConfiguredMaximumBytesToFetch() throws Exception 
{
+        // Create an explicit test to check that 
controller.quorum.fetch.max.size.bytes is used to construct fetch

Review Comment:
   Done! Let me know what you think :)



##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -1866,8 +1888,7 @@ FetchRequestData fetchRequest(
         long fetchOffset,
         int lastFetchedEpoch,
         OptionalLong highWatermark,
-        int maxWaitTimeMs
-    ) {
+        int maxWaitTimeMs) {

Review Comment:
   Good catch! Done!



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                128,
+                1
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        // Append twice to ensure we have two batches.
+        append(log, 1, 1);
+        append(log, 2, 1);
+
+        // If the default configured value of 1 is used we will read a single 
record.
+        LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+        // Exactly 1 batch of records will be read. Since there are 2 batches, 
with the first batch having 1 record
+        // only 1 record should be returned.
+        assertRecords(info, 1, 1);
+    }
+
+    @Test
+    public void testNonDefaultReadFromLog() throws IOException {
+        int batchSizeBytes = 1024;
+        int maxSizeToReadBytes = 1;
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                batchSizeBytes,
+                maxSizeToReadBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        int recordsPerBatch = 5;
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+
+        // Default value of 1 is NOT used in this case.
+        LogFetchInfo info = log.read(0,
+                Isolation.UNCOMMITTED,
+                batchSizeBytes * 3);
+
+        assertRecords(info, recordsPerBatch * 2, recordsPerBatch);
+    }
+
+    private static void assertRecords(LogFetchInfo info, int numberExpected, 
int recordsPerBatch) {

Review Comment:
   Hmmm. Since this is a `private static` method and none of the other `private 
static` utility methods in this class have a java doc I don't think it's really 
needed. 



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                128,
+                1
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        // Append twice to ensure we have two batches.
+        append(log, 1, 1);
+        append(log, 2, 1);
+
+        // If the default configured value of 1 is used we will read a single 
record.
+        LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+        // Exactly 1 batch of records will be read. Since there are 2 batches, 
with the first batch having 1 record
+        // only 1 record should be returned.
+        assertRecords(info, 1, 1);
+    }

Review Comment:
   Then 2 records will be returned. It seems like the LogSegment implementation 
will return at least 1 *batch*. I changed the test slightly to illustrate this. 



##########
raft/src/main/java/org/apache/kafka/raft/RaftLog.java:
##########
@@ -58,8 +58,26 @@ public interface RaftLog extends AutoCloseable {
 
     /**
      * Read a set of records within a range of offsets.
+     * maxTotalRecordsSizeBytes specifies a "soft" max for total byte size of 
the records to read.
      */
-    LogFetchInfo read(long startOffsetInclusive, Isolation isolation);
+    LogFetchInfo read(long startOffsetInclusive, Isolation isolation, int 
maxTotalRecordsSizeBytes);
+
+    /**
+     * Configures a soft max for total size of bytes read via default read 
function implementation.
+     * Reads which are called from Fetch requests have a configured soft-max 
at the Quorum level (KIP-1219).
+     * Most reads are, however, intended to be from a disk based raft log to 
memory.
+     * The default is intended for local reads with only reads intended to be 
transmitted as a response to fetch using
+     * the non-default implementation.

Review Comment:
   Fair enough. I removed this comment because I felt like there was too much 
exposition. The most important thing that an implementer should know is that it 
should be a "soft-max" with respect to batches. Don't think additional 
information is needed. 



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1321,7 +1323,7 @@ public void testFetchSnapshotResponsePartialData(boolean 
withKip853Rpc) throws E
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            context.fetchSnapshotMaxSizeBytes

Review Comment:
   The kip configuration is `controller.quorum.fetchsnapshot.max.bytes` so it 
is setup to resemble the configured value (although it is a bit "different" 
when compared with the old `KafkaRaftClient.MAX_FETCH_SIZE_BYTES`).



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +93,75 @@ private static void 
testFetchResponseWithInvalidRecord(MemoryRecords records, in
         assertEquals(oldLogEndOffset, context.log.endOffset().offset());
     }
 
+    @Test
+    void testFetchRequestObeysConfiguredMaximumBytesToFetch() throws Exception 
{
+        // Create an explicit test to check that 
controller.quorum.fetch.max.size.bytes is used to construct fetch
+        // requests.
+        int epoch = 2;
+        int localId = KafkaRaftClientTest.randomReplicaId();
+        ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+        ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, 
true);
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, electedLeader)), 
KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, electedLeader.id())
+            // Explicitly change the configuration here.
+            .withFetchMaxSizeBytes(1024)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        // assertFetchRequestData contains a check which verifies the 
SizeBytes field of the Fetch request.
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
OptionalLong.empty());
+    }
+
+    @Test
+    public void testMaxBytesRequestedFromLogsRespectsValueInFetchRequest() 
throws Exception {
+        var epoch = 2;
+        var id = KafkaRaftClientTest.randomReplicaId();
+        var localKey = KafkaRaftClientTest.replicaKey(id, true);
+        var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+        var localMaxSizeBytes = 1024;
+        var remoteMaxSizeBytes = 512;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+                localKey.id(),
+                localKey.directoryId().get()
+        )
+                .appendToLog(epoch, List.of("a", "b", "c"))
+                .appendToLog(epoch, List.of("d", "e", "f"))
+                .withStartingVoters(
+                        VoterSetTest.voterSet(Stream.of(localKey, remoteKey)), 
KRaftVersion.KRAFT_VERSION_1
+                )
+                .withUnknownLeader(epoch)
+                .withFetchMaxSizeBytes(localMaxSizeBytes)
+                .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // The next read from MockLog will be intended for a fetch request.
+        // We wish to assert that it uses the value supplied from the fetch.
+        context.log.setExpectedMaxTotalRecordsSizeBytes(remoteMaxSizeBytes);
+
+        // Send a fetch request with max bytes that are different from the 
configured value.
+        FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L, 
epoch, 500);
+        request.setMaxBytes(remoteMaxSizeBytes);
+        context.deliverRequest(request);
+
+        context.pollUntilResponse();
+        FetchResponseData.PartitionData partitionData = 
context.assertSentFetchPartitionResponse();
+        // Failures for this test will appear in error-logs.

Review Comment:
   Added comment which is a bit more clear. 



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {

Review Comment:
   Great suggestion! Naming things is hard 🤣 



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                128,
+                1
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        // Append twice to ensure we have two batches.
+        append(log, 1, 1);
+        append(log, 2, 1);
+
+        // If the default configured value of 1 is used we will read a single 
record.
+        LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+        // Exactly 1 batch of records will be read. Since there are 2 batches, 
with the first batch having 1 record
+        // only 1 record should be returned.
+        assertRecords(info, 1, 1);
+    }
+
+    @Test
+    public void testNonDefaultReadFromLog() throws IOException {
+        int batchSizeBytes = 1024;
+        int maxSizeToReadBytes = 1;
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                batchSizeBytes,
+                maxSizeToReadBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        int recordsPerBatch = 5;
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+
+        // Default value of 1 is NOT used in this case.

Review Comment:
   Good catch! I removed the comment because I think your 
[suggestion](https://github.com/apache/kafka/pull/21028#discussion_r2688313678) 
made the code more clear so it is no longer needed. 



##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws 
IOException {
         );
     }
 
+    @Test
+    public void testReadOfDefaultLogValue() throws IOException {
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                128,
+                1
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        // Append twice to ensure we have two batches.
+        append(log, 1, 1);
+        append(log, 2, 1);
+
+        // If the default configured value of 1 is used we will read a single 
record.
+        LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+        // Exactly 1 batch of records will be read. Since there are 2 batches, 
with the first batch having 1 record
+        // only 1 record should be returned.
+        assertRecords(info, 1, 1);
+    }
+
+    @Test
+    public void testNonDefaultReadFromLog() throws IOException {
+        int batchSizeBytes = 1024;
+        int maxSizeToReadBytes = 1;
+        MetadataLogConfig config = createMetadataLogConfig(
+                10240,
+                10 * 1000,
+                10240,
+                60 * 1000,
+                batchSizeBytes,
+                maxSizeToReadBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        int recordsPerBatch = 5;
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+
+        // Default value of 1 is NOT used in this case.
+        LogFetchInfo info = log.read(0,
+                Isolation.UNCOMMITTED,
+                batchSizeBytes * 3);

Review Comment:
   Improved the test by parameterizing it and actually worked out an exact 
meaningful value for `batchSizeBytes`. Hopefully it is more clear. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to