junrao commented on code in PR #19489:
URL: https://github.com/apache/kafka/pull/19489#discussion_r2644024076


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -174,43 +174,6 @@ private void maybeRemoveAndDeallocateBatch(ProducerBatch 
batch) {
         this.accumulator.deallocate(batch);
     }
 
-    /**
-     *  Get the in-flight batches that has reached delivery timeout.
-     */
-    private List<ProducerBatch> getExpiredInflightBatches(long now) {

Review Comment:
   In the current implementation, when failing a batch, we always (1) reset 
some states and fail the producer callback; (2) reclaim the buffer in the batch 
for reuse. This is causing an issue for failing the inflight batch when it hits 
the delivery time since we can't claim the buffer in the batch at that point. 
There are two potential ways to fix this issue. 
   
   The first one is what this PR does. It doesn't fail the inflight batch when 
it hits delivery time. instead, it waits for the inflight batch to hit the 
request timeout and then fail the batch (at this time, it's safe to reclaim  
the buffer in the batch). The benefit of this approach is that the fix is 
simple. The side effect is that in some rare cases, a record could take deliver 
time + request timeout to expire.
   
   Another way is to continue to fail the inflight batch when it hits the 
delivery time. However, we only do item (1) above when failing the batch. When 
the inflight request times out, we do item (2) to reclaim the buffer. This 
keeps the delivery time guarantee, but is a more complex fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to