shashankhs11 commented on code in PR #20285:
URL: https://github.com/apache/kafka/pull/20285#discussion_r2409093988
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1072,12 +1073,25 @@ private boolean appendsInProgress() {
*/
public void awaitFlushCompletion() throws InterruptedException {
try {
- // Obtain a copy of all of the incomplete ProduceRequestResult(s)
at the time of the flush.
- // We must be careful not to hold a reference to the
ProduceBatch(s) so that garbage
- // collection can occur on the contents.
- // The sender will remove ProducerBatch(s) from the original
incomplete collection.
- for (ProduceRequestResult result :
this.incomplete.requestResults())
- result.await();
+ // Obtain a snapshot of all record futures at the time of the
flush.
+ // We wait on individual record futures rather than batch-level
futures because
+ // by waiting on record futures, we ensure flush() blocks until
all split
+ // batches complete.
+ //
+ // We first collect all futures into a list first to avoid holding
references to
+ // ProducerBatch objects, allowing them to be garbage collected
after completion.
+ List<FutureRecordMetadata> futures = new ArrayList<>();
+ for (ProducerBatch batch : this.incomplete.copyAll()) {
Review Comment:
`requestResults()` doesn't expose record level futures. They are inside the
private `thunks` list.
That's why I used `copyAll()` to get the actual `ProducerBatch` objects
directly and then extract the record futures.
Is my approach correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]