shashankhs11 commented on code in PR #20285:
URL: https://github.com/apache/kafka/pull/20285#discussion_r2409115717


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1072,12 +1073,25 @@ private boolean appendsInProgress() {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            // Obtain a copy of all of the incomplete ProduceRequestResult(s) 
at the time of the flush.
-            // We must be careful not to hold a reference to the 
ProduceBatch(s) so that garbage
-            // collection can occur on the contents.
-            // The sender will remove ProducerBatch(s) from the original 
incomplete collection.
-            for (ProduceRequestResult result : 
this.incomplete.requestResults())
-                result.await();
+            // Obtain a snapshot of all record futures at the time of the 
flush.
+            // We wait on individual record futures rather than batch-level 
futures because
+            // by waiting on record futures, we ensure flush() blocks until 
all split
+            // batches complete.
+            //
+            // We first collect all futures into a list first to avoid holding 
references to

Review Comment:
   Done in 616e5fe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to