junrao commented on PR #20254:
URL: https://github.com/apache/kafka/pull/20254#issuecomment-3363498212

   @mjsax : Took a closer look. This is indeed a bug in the producer. 
ProducerBatch has a produceFuture of type ProduceRequestResult and a list of 
Thunk, one for each produced record. Each Thunk has a future of type 
FutureRecordMetadata. When we split a ProducerBatch, we have the logic to chain 
Thunk.future to the newly split batches. This makes sure that we don't unblock 
the producer.send() prematurely. But the problem is that flush() doesn't wait 
on  Thunk.future. Instead, it waits on ProducerBatch.produceFuture. After 
splitting, we immediately call ProducerBatch.produceFuture.done(). This will 
unblock the flush() call prematurely since the splitted batches haven't been 
completed.
   
   As for the fix, one way is to change the logic in 
RecordAccumulator.awaitFlushCompletion(). Instead of doing the following,
   ```
               for (ProduceRequestResult result : 
this.incomplete.requestResults())
                   result.await();
   ```
   we collect all thunks in each incomplete batch and wait on each thunk's 
FutureRecordMetadata. This way, the chaining logic for FutureRecordMetadata 
will kick in.
   
   As for why the splitting loops forever, this is probably because of a 
recently fixed bug in https://github.com/apache/kafka/pull/20358/files.
   
   @shashankhs11 : Would you be willing to submit a patch?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to