junrao commented on code in PR #19489:
URL: https://github.com/apache/kafka/pull/19489#discussion_r2607602014
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2655,17 +2653,17 @@ public void
testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exceptio
client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
time.sleep(deliverTimeoutMs);
- // expire the batch and process the response
+ // Does not verify expired batch
sender.runOnce();
- assertTrue(request1.isDone());
- assertTrue(request2.isDone());
assertEquals(0, client.inFlightRequestCount());
assertEquals(0, sender.inFlightBatches(tp0).size());
// run again and must not split big batch and resend anything.
sender.runOnce();
assertEquals(0, client.inFlightRequestCount());
assertEquals(0, sender.inFlightBatches(tp0).size());
+ assertTrue(request1.isDone());
Review Comment:
> // run again and must not split big batch and resend anything.
The code doesn't match the comment above. Also, the test fails now and needs
to be adjusted.
```
org.opentest4j.AssertionFailedError:
Expected :0
Actual :1
<Click to see difference>
at
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at
org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
at
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
at
org.apache.kafka.clients.producer.internals.SenderTest.testExpiredBatchDoesNotSplitOnMessageTooLargeError(SenderTest.java:2663)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
```
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2714,12 +2712,8 @@ public void testExpiredBatchesInMultiplePartitions()
throws Exception {
time.sleep(deliveryTimeoutMs);
sender.runOnce();
assertEquals(0, sender.inFlightBatches(tp0).size(), "Expect zero
in-flight batch in accumulator");
-
- ExecutionException e = assertThrows(ExecutionException.class,
request1::get);
- assertInstanceOf(TimeoutException.class, e.getCause());
-
- e = assertThrows(ExecutionException.class, request2::get);
- assertInstanceOf(TimeoutException.class, e.getCause());
+ assertDoesNotThrow(() -> request1.get());
Review Comment:
Same comment for `testInflightBatchesExpireOnDeliveryTimeout`.
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -2533,10 +2534,7 @@ public void testInflightBatchesExpireOnDeliveryTimeout()
throws InterruptedExcep
time.sleep(deliveryTimeoutMs);
sender.runOnce(); // receive first response
assertEquals(0, sender.inFlightBatches(tp0).size(), "Expect zero
in-flight batch in accumulator");
- assertInstanceOf(
- TimeoutException.class,
- assertThrows(ExecutionException.class, request::get).getCause(),
- "The expired batch should throw a TimeoutException");
+ assertDoesNotThrow(() -> request.get());
Review Comment:
1. We need to change the test name since now we don't expire inflight
requests.
2. We can modify this test to provide better coverage. We can avoid sending
the client response first. Let the expiration time expire and check that the
inflight request remains uncompleted. We then send the client response and
check that the inflight request is completed. With this test, we don't need
ProducerIntegrationTest.
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1602,13 +1602,34 @@ private MemoryRecords trimInvalidBytes(MemoryRecords
records, LogAppendInfo info
if (validBytes == records.sizeInBytes()) {
return records;
} else {
- // trim invalid bytes
+ // Duplicate the original buffer for trimming and logging purposes.
ByteBuffer validByteBuffer = records.buffer().duplicate();
+
+ // Log detailed information about trimmed bytes
+ validByteBuffer.position(validBytes);
+ byte[] invalidBytes = new byte[records.sizeInBytes() - validBytes];
+ validByteBuffer.get(invalidBytes);
+ String invalidBytesHex = bytesToHex(invalidBytes);
+
+ logger.warn("Trimming invalid bytes from message set for partition
{}. Original size: {} bytes, valid bytes: {}, trimmed bytes: {}. " +
Review Comment:
1. We only need to log the amount of trimmed bytes. We don't need to log the
content since it's typically binary and not meaningful.
2. Appends from the follower could have trimmed bytes. So we only want to
log the warning if the append comes from the leader.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]