junrao commented on code in PR #21065:
URL: https://github.com/apache/kafka/pull/21065#discussion_r2658498977


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -354,6 +363,39 @@ private boolean 
shouldHandleAuthorizationError(RuntimeException exception) {
         return false;
     }
 
+    private void failExpiredBatches(List<ProducerBatch> expiredBatches, long 
now, boolean deallocateNow) {
+        // Reset the producer id if an expired batch has previously been sent 
to the broker. Also update the metrics
+        // for expired batches. see the documentation of 
@TransactionState.resetIdempotentProducerId to understand why
+        // we need to reset the producer id here.
+        if (!expiredBatches.isEmpty())
+            log.trace("Expired {} batches in accumulator", 
expiredBatches.size());
+        for (ProducerBatch expiredBatch : expiredBatches) {
+            String errorMessage = "Expiring " + expiredBatch.recordCount + " 
record(s) for " + expiredBatch.topicPartition
+                + ":" + (now - expiredBatch.createdMs) + " ms has passed since 
batch creation";
+            failBatch(expiredBatch, new TimeoutException(errorMessage), false, 
deallocateNow);
+            if (transactionManager != null && expiredBatch.inRetry()) {
+                // This ensures that no new batches are drained until the 
current in flight batches are fully resolved.
+                transactionManager.markSequenceUnresolved(expiredBatch);
+            }
+        }
+    }
+
+    private void deallocateRequestTimeoutBatches(long now) {
+        Iterator<ProducerBatch> iter = 
expiredInflightBatchesAwaitingDeallocation.iterator();
+        while (iter.hasNext()) {
+            ProducerBatch batch = iter.next();
+            if (batch.hasReachedTimeout(requestTimeoutMs, now)) {

Review Comment:
   Each inflight batch has a pending RPC request in NetworkClient. It's 
probably better to deallocate the buffer when the RPC request completes through 
either the failBatch() or completeBatch() call. I am thinking that we can add 
an AtomicBoolean deallocated in ProducerBatch. Add a new param deallocateBuffer 
to failBatch(). When calling failBatch() on the inflight batches, we set 
deallocateBuffer to false. In all other calls to failBatch(), we set 
deallocateBuffer to true. In completeBatch() and failBatch() (if 
deallocateBuffer is true), we deallocate the buffer if deallocated is false and 
then mark deallocated as true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to