jsancio commented on code in PR #21028:
URL: https://github.com/apache/kafka/pull/21028#discussion_r2961218318
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java:
##########
@@ -359,7 +364,12 @@ public Optional<RawSnapshotWriter>
createNewSnapshot(OffsetAndEpoch snapshotId)
fetches from this offset, the returned batch will start at offset (X
- M), and the
follower will be unable to append it since (X - M) < (X).
*/
- long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
+ long baseOffset = read(
+ snapshotId.offset(),
+ Isolation.COMMITTED,
+ 1
Review Comment:
Try to add a comment explaining the paramster when using literals.
```java
1 // maxLength
```
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +96,191 @@ private static void
testFetchResponseWithInvalidRecord(MemoryRecords records, in
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+ int expectedFetchMaxBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, electedLeader.id())
+ // Explicitly change the configuration here.
+ .withFetchMaxBytes(expectedFetchMaxBytes)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
+ FetchRequestData data = (FetchRequestData) fetchRequest.data();
+ assertEquals(expectedFetchMaxBytes, data.maxBytes());
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAllBatchesForLargeMax() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // Here we are effectively saying that there is no limit to the amount
of records to return.
+ var remoteMaxSizeBytes = Integer.MAX_VALUE;
+ var localMaxSizeBytes = 1;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ int offsetCount = 0;
+ int batchCount = 0;
+ while (iterator.hasNext()) {
+ var batch = iterator.next();
+ var recordsIterator = batch.iterator();
+ while (recordsIterator.hasNext()) {
+ var record = recordsIterator.next();
+ assertEquals(offsetCount, record.offset());
+ offsetCount++;
+ }
+ batchCount++;
+ }
+ assertEquals(2, batchCount);
+ assertEquals(6, offsetCount);
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAtLeastOneBatch() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // There are two batches with 3 records each. The first batch is
always at least larger than 1 byte.
+ // If remoteMaxSizeBytes = 1 then the MockLog will return exactly 1
batch.
+ // If localMaxSizeBytes is used then MockLog will return two batches
+ var remoteMaxSizeBytes = 1;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ var firstBatch = iterator.next();
+ assertEquals(0, firstBatch.baseOffset());
+ assertEquals(3, firstBatch.nextOffset());
+ assertFalse(
+ iterator.hasNext(),
+ String.format("Expected only a single batch to be fetched for
maxSize = %d", remoteMaxSizeBytes)
+ );
+ }
+
+ @Test
+ public void testFetchMaxBytesFromRemoteFetchUsed() throws Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ var remoteMaxSizeBytes = 115;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .appendToLog(epoch, List.of("c", "c", "c"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ // Invariant is that we will always return records in batches and will
include batches which "go over"
+ // the controller.quorum.fetch.max.bytes configuration.
+ assertTrue(
+ records.sizeInBytes() < remoteMaxSizeBytes * 2,
+ String.format(
+ "Expected records size (%d) < remoteMaxSizeBytes*2 (%d)",
+ records.sizeInBytes(),
+ remoteMaxSizeBytes * 2
+ )
+ );
Review Comment:
This invariant is not obvious to me. Ideally, you would want this invariant:
```java
(firstBatch.sizeInBytes() == records.sizeInBytes() && records.sizeInBytes()
>= remoteMaxSizeBytes) || records.sizeInBytes() <= remoteMaxSizeBytes
```
or this more general invariant
```java
firstBatch.sizeInBytes() == records.sizeInBytes() || records.sizeInBytes()
<= remoteMaxSizeBytes
```
##########
raft/src/test/java/org/apache/kafka/raft/MockLog.java:
##########
@@ -513,7 +517,11 @@ public Optional<RawSnapshotWriter>
createNewSnapshot(OffsetAndEpoch snapshotId)
);
}
- long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
+ long baseOffset = read(
+ snapshotId.offset(),
+ Isolation.COMMITTED,
+ KafkaRaftClient.MAX_FETCH_SIZE_BYTES
Review Comment:
This only needs to read one batch, right? Why not make it size 1 like we do
somewhere else?
##########
raft/src/test/java/org/apache/kafka/raft/MockLogTest.java:
##########
@@ -991,6 +1041,48 @@ public void testValidateValidEpochAndOffset() {
assertEquals(ValidOffsetAndEpoch.Kind.VALID,
resultOffsetAndEpoch.kind());
}
+ @Test
+ public void testMockLogLimits() {
+ appendBatch(10, 5);
+ appendBatch(10, 5);
+ // magicMaxTotalBytes are smaller than 10 simple records in a batch.
+ // Meaning we will read only the first batch and not the second.
+ int magicMaxTotalBytes = 100;
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ magicMaxTotalBytes
+ ).records;
+ long expectedOffset = 0L;
+ long lastReadOffset = 0L;
+ for (Record record: records.records()) {
+ lastReadOffset = record.offset();
+ assertEquals(expectedOffset, lastReadOffset);
+ expectedOffset += 1;
+ }
Review Comment:
What is this testing? To me this is testing that offset are monotonically
increasing, no? And that the last offset is 9, right?
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +96,191 @@ private static void
testFetchResponseWithInvalidRecord(MemoryRecords records, in
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+ int expectedFetchMaxBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
Review Comment:
How about:
```java
VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
```
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1023,6 +1023,87 @@ public void testFetchSnapshotRequestAsFollower(boolean
withKip853Rpc) throws IOE
assertEquals(leaderId, response.currentLeader().leaderId());
}
+ @Test
+ public void testFetchSnapshotRequestWithPartialData() throws Exception {
+ int localId = randomReplicaId();
+ Set<Integer> voters = Set.of(localId, localId + 1);
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
+ List<String> records = List.of("foo", "bar");
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(snapshotId.epoch(), List.of("a"))
+ .withKip853Rpc(true)
+ .build();
+
+ context.unattachedToLeader();
+ int epoch = context.currentEpoch();
+
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+ try (SnapshotWriter<String> snapshot =
context.client.createSnapshot(snapshotId, 0).get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ snapshot.append(records);
+ snapshot.freeze();
+ }
+
+ // Test that we will respond with at least 2 equally sized read of the
snapshot.
+ RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get();
+ int snapshotSizeBytes = Math.toIntExact(snapshot.sizeInBytes());
+ int expectedNumberOfReads = 2;
Review Comment:
Okay but reading the code it looks like the expected number of
FETCH_SNAPSHOT is `expectedNumberOfReads + 1`.
##########
raft/src/test/java/org/apache/kafka/raft/MockLog.java:
##########
@@ -444,6 +444,7 @@ public LogFetchInfo read(long startOffset, Isolation
isolation, int maxTotalBatc
// complete batches, so batches which end at an offset larger than
the max offset are
// filtered, which is effectively the same as having the consumer
drop an incomplete
// batch returned in a fetch response.
+ // Exits loop if position of buffer goes beyond maxTotalBatchBytes.
Review Comment:
Ideally a good comment explains the "why" behind the implementation, not the
"what" or "how". The reader can understand the what or how by reading the code.
E.g. why does the code need to exit the loop when the buffer position is beyond
max total batch bytes?
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +96,191 @@ private static void
testFetchResponseWithInvalidRecord(MemoryRecords records, in
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+ int expectedFetchMaxBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, electedLeader.id())
+ // Explicitly change the configuration here.
+ .withFetchMaxBytes(expectedFetchMaxBytes)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
+ FetchRequestData data = (FetchRequestData) fetchRequest.data();
+ assertEquals(expectedFetchMaxBytes, data.maxBytes());
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAllBatchesForLargeMax() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // Here we are effectively saying that there is no limit to the amount
of records to return.
+ var remoteMaxSizeBytes = Integer.MAX_VALUE;
+ var localMaxSizeBytes = 1;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ int offsetCount = 0;
+ int batchCount = 0;
+ while (iterator.hasNext()) {
+ var batch = iterator.next();
+ var recordsIterator = batch.iterator();
+ while (recordsIterator.hasNext()) {
+ var record = recordsIterator.next();
+ assertEquals(offsetCount, record.offset());
+ offsetCount++;
+ }
+ batchCount++;
+ }
+ assertEquals(2, batchCount);
+ assertEquals(6, offsetCount);
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAtLeastOneBatch() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // There are two batches with 3 records each. The first batch is
always at least larger than 1 byte.
+ // If remoteMaxSizeBytes = 1 then the MockLog will return exactly 1
batch.
+ // If localMaxSizeBytes is used then MockLog will return two batches
+ var remoteMaxSizeBytes = 1;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ var firstBatch = iterator.next();
+ assertEquals(0, firstBatch.baseOffset());
+ assertEquals(3, firstBatch.nextOffset());
+ assertFalse(
+ iterator.hasNext(),
+ String.format("Expected only a single batch to be fetched for
maxSize = %d", remoteMaxSizeBytes)
+ );
+ }
+
+ @Test
+ public void testFetchMaxBytesFromRemoteFetchUsed() throws Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ var remoteMaxSizeBytes = 115;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .appendToLog(epoch, List.of("c", "c", "c"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
Review Comment:
Same here.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +96,191 @@ private static void
testFetchResponseWithInvalidRecord(MemoryRecords records, in
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+ int expectedFetchMaxBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, electedLeader.id())
+ // Explicitly change the configuration here.
+ .withFetchMaxBytes(expectedFetchMaxBytes)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
+ FetchRequestData data = (FetchRequestData) fetchRequest.data();
+ assertEquals(expectedFetchMaxBytes, data.maxBytes());
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAllBatchesForLargeMax() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // Here we are effectively saying that there is no limit to the amount
of records to return.
+ var remoteMaxSizeBytes = Integer.MAX_VALUE;
+ var localMaxSizeBytes = 1;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ int offsetCount = 0;
+ int batchCount = 0;
+ while (iterator.hasNext()) {
+ var batch = iterator.next();
+ var recordsIterator = batch.iterator();
+ while (recordsIterator.hasNext()) {
+ var record = recordsIterator.next();
+ assertEquals(offsetCount, record.offset());
+ offsetCount++;
+ }
+ batchCount++;
+ }
+ assertEquals(2, batchCount);
+ assertEquals(6, offsetCount);
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAtLeastOneBatch() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // There are two batches with 3 records each. The first batch is
always at least larger than 1 byte.
+ // If remoteMaxSizeBytes = 1 then the MockLog will return exactly 1
batch.
+ // If localMaxSizeBytes is used then MockLog will return two batches
+ var remoteMaxSizeBytes = 1;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
Review Comment:
Same comment here. Please add a newline between the parameters. Long
parameter list in one line are hard to read.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +96,191 @@ private static void
testFetchResponseWithInvalidRecord(MemoryRecords records, in
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+ int expectedFetchMaxBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, electedLeader.id())
+ // Explicitly change the configuration here.
+ .withFetchMaxBytes(expectedFetchMaxBytes)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
+ FetchRequestData data = (FetchRequestData) fetchRequest.data();
+ assertEquals(expectedFetchMaxBytes, data.maxBytes());
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAllBatchesForLargeMax() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // Here we are effectively saying that there is no limit to the amount
of records to return.
+ var remoteMaxSizeBytes = Integer.MAX_VALUE;
+ var localMaxSizeBytes = 1;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
Review Comment:
How about:
```java
VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
```
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +96,191 @@ private static void
testFetchResponseWithInvalidRecord(MemoryRecords records, in
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+ int expectedFetchMaxBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, electedLeader.id())
+ // Explicitly change the configuration here.
+ .withFetchMaxBytes(expectedFetchMaxBytes)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
+ FetchRequestData data = (FetchRequestData) fetchRequest.data();
+ assertEquals(expectedFetchMaxBytes, data.maxBytes());
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAllBatchesForLargeMax() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // Here we are effectively saying that there is no limit to the amount
of records to return.
+ var remoteMaxSizeBytes = Integer.MAX_VALUE;
+ var localMaxSizeBytes = 1;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ int offsetCount = 0;
+ int batchCount = 0;
+ while (iterator.hasNext()) {
+ var batch = iterator.next();
+ var recordsIterator = batch.iterator();
+ while (recordsIterator.hasNext()) {
+ var record = recordsIterator.next();
+ assertEquals(offsetCount, record.offset());
+ offsetCount++;
+ }
+ batchCount++;
+ }
+ assertEquals(2, batchCount);
+ assertEquals(6, offsetCount);
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAtLeastOneBatch() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // There are two batches with 3 records each. The first batch is
always at least larger than 1 byte.
+ // If remoteMaxSizeBytes = 1 then the MockLog will return exactly 1
batch.
+ // If localMaxSizeBytes is used then MockLog will return two batches
+ var remoteMaxSizeBytes = 1;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ var firstBatch = iterator.next();
+ assertEquals(0, firstBatch.baseOffset());
+ assertEquals(3, firstBatch.nextOffset());
+ assertFalse(
+ iterator.hasNext(),
+ String.format("Expected only a single batch to be fetched for
maxSize = %d", remoteMaxSizeBytes)
+ );
+ }
+
+ @Test
+ public void testFetchMaxBytesFromRemoteFetchUsed() throws Exception {
Review Comment:
Aren't all of the tests testing this? "remote fetch used"? To me this test
is testing `testFetchMaxBytesReturnMaxBytesForTwoOrMoreBatches`. I tend to
write a short comment to explain the test if the test method name is too
cryptic. E.g.
```java
public void testFetchMaxBytesFromRemoteFetchUsed() throws Exception {
// test case: FETCH returns at most maxBytes if the first batch is
less than maxBytes.
...
```
##########
raft/src/main/java/org/apache/kafka/raft/RaftLog.java:
##########
@@ -57,9 +57,16 @@ public interface RaftLog extends AutoCloseable {
LogAppendInfo appendAsFollower(Records records, int epoch);
/**
- * Read a set of records within a range of offsets.
+ * Read a set of records from startOffsetInclusive. Always returns at
least one records batch if one exists.
+ *
+ * @param startOffsetInclusive Records later and including this offset are
returned.
+ * @param isolation The fetch isolation, which controls the maximum offset
we are allowed to read.
+ * @param maxTotalBatchBytes Soft max for number of bytes to retrieve.
Will stop returning batches once the
+ * size of previously returned batches
exceeds maxTotalBatchBytes
+ * @return Records and start offset information wrapped in a LogFetchInfo
*/
- LogFetchInfo read(long startOffsetInclusive, Isolation isolation);
+ LogFetchInfo read(long startOffsetInclusive, Isolation isolation, int
maxTotalBatchBytes);
Review Comment:
Did you take a look at the name and description used by UnifiedLog? How
about:
```java
/**
* @param maxLength the maximum number of bytes to read if there are
more than one records batch
*/
LogFetchInfo read(long startOffsetInclusive, Isolation isolation, int
maxLength);
```
##########
raft/src/test/java/org/apache/kafka/raft/MockLogTest.java:
##########
@@ -991,6 +1041,48 @@ public void testValidateValidEpochAndOffset() {
assertEquals(ValidOffsetAndEpoch.Kind.VALID,
resultOffsetAndEpoch.kind());
}
+ @Test
+ public void testMockLogLimits() {
+ appendBatch(10, 5);
+ appendBatch(10, 5);
+ // magicMaxTotalBytes are smaller than 10 simple records in a batch.
+ // Meaning we will read only the first batch and not the second.
+ int magicMaxTotalBytes = 100;
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ magicMaxTotalBytes
+ ).records;
Review Comment:
Extra spaces for the indentation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]