junrao commented on PR #20254:
URL: https://github.com/apache/kafka/pull/20254#issuecomment-3363498212
@mjsax : Took a closer look. This is indeed a bug in the producer.
ProducerBatch has a produceFuture of type ProduceRequestResult and a list of
Thunk, one for each produced record. Each Thunk has a future of type
FutureRecordMetadata. When we split a ProducerBatch, we have the logic to chain
Thunk.future to the newly split batches. This makes sure that we don't unblock
the producer.send() prematurely. But the problem is that flush() doesn't wait
on Thunk.future. Instead, it waits on ProducerBatch.produceFuture. After
splitting, we immediately call ProducerBatch.produceFuture.done(). This will
unblock the flush() call prematurely since the splitted batches haven't been
completed.
As for the fix, one way is to change the logic in
RecordAccumulator.awaitFlushCompletion(). Instead of doing the following,
```
for (ProduceRequestResult result :
this.incomplete.requestResults())
result.await();
```
we collect all thunks in each incomplete batch and wait on each thunk's
FutureRecordMetadata. This way, the chaining logic for FutureRecordMetadata
will kick in.
As for why the splitting loops forever, this is probably because of a
recently fixed bug in https://github.com/apache/kafka/pull/20358/files.
@shashankhs11 : Would you be willing to submit a patch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]