junrao commented on code in PR #19489:
URL: https://github.com/apache/kafka/pull/19489#discussion_r2644024076
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -174,43 +174,6 @@ private void maybeRemoveAndDeallocateBatch(ProducerBatch
batch) {
this.accumulator.deallocate(batch);
}
- /**
- * Get the in-flight batches that has reached delivery timeout.
- */
- private List<ProducerBatch> getExpiredInflightBatches(long now) {
Review Comment:
In the current implementation, when failing a batch, we always (1) reset
some states and fail the producer callback; (2) reclaim the buffer in the batch
for reuse. This is causing an issue for failing the inflight batch when it hits
the delivery time since we can't claim the buffer in the batch at that point.
There are two potential ways to fix this issue.
The first one is what this PR does. It doesn't fail the inflight batch when
it hits delivery time. instead, it waits for the inflight batch to hit the
request timeout and then fail the batch (at this time, it's safe to reclaim
the buffer in the batch). The benefit of this approach is that the fix is
simple. The side effect is that in some rare cases, a record could take deliver
time + request timeout to expire.
Another way is to continue to fail the inflight batch when it hits the
delivery time. However, we only do item (1) above when failing the batch. When
the inflight request times out, we do item (2) to reclaim the buffer. This
keeps the delivery time guarantee, but might be a more complex fix.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]