jsancio commented on code in PR #16862:
URL: https://github.com/apache/kafka/pull/16862#discussion_r1716095399
##########
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java:
##########
@@ -729,52 +714,23 @@ public synchronized OptionalLong highWatermark() {
}
@Override
- public long scheduleAppend(int epoch, List<ApiMessageAndVersion> batch) {
+ public long scheduleAppend(
+ int epoch,
+ List<ApiMessageAndVersion> batch
+ ) {
if (batch.isEmpty()) {
throw new IllegalArgumentException("Batch cannot be empty");
}
- List<ApiMessageAndVersion> first = batch.subList(0, batch.size() / 2);
- List<ApiMessageAndVersion> second = batch.subList(batch.size() / 2,
batch.size());
-
- assertEquals(batch.size(), first.size() + second.size());
- assertFalse(second.isEmpty());
-
- OptionalLong firstOffset = first
- .stream()
- .mapToLong(record -> scheduleAtomicAppend(epoch,
- OptionalLong.empty(),
- Collections.singletonList(record)))
- .max();
-
- if (firstOffset.isPresent() &&
resignAfterNonAtomicCommit.getAndSet(false)) {
- // Emulate losing leadership in the middle of a non-atomic append
by not writing
- // the rest of the batch and instead writing a leader change
message
- resign(leader.epoch());
-
- return firstOffset.getAsLong() + second.size();
- } else {
- return second
- .stream()
- .mapToLong(record -> scheduleAtomicAppend(epoch,
- OptionalLong.empty(),
- Collections.singletonList(record)))
- .max()
- .getAsLong();
+ if (throwOnNextAppend.getAndSet(false)) {
+ throw new BufferAllocationException("Test asked to fail the next
scheduleAppend");
}
+
+ return shared.tryAppend(nodeId, leader.epoch(), batch);
}
@Override
- public long scheduleAtomicAppend(
- int epoch,
- OptionalLong requiredEndOffset,
- List<ApiMessageAndVersion> batch
- ) {
- if (batch.isEmpty()) {
- throw new IllegalArgumentException("Batch cannot be empty");
- }
- return shared.tryAppend(nodeId, leader.epoch(), requiredEndOffset,
batch);
- }
+ public void scheduleFlush() { }
Review Comment:
See my other comment. I filed
https://issues.apache.org/jira/browse/KAFKA-17339.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]