[ https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007664#comment-17007664 ]
Jonathan Santilli commented on KAFKA-9312: ------------------------------------------ Checking a little bit further seems like yes, it finishes but if the batch was splitted, the _future_ gets chained: {code:java} @Override public RecordMetadata get() throws InterruptedException, ExecutionException { this.result.await(); // If Finish here if (nextRecordMetadata != null) return nextRecordMetadata.get(); return valueOrError(); } ... /** * This method is used when we have to split a large batch in smaller ones. A chained metadata will allow the * future that has already returned to the users to wait on the newly created split batches even after the * old big batch has been deemed as done. */ void chain(FutureRecordMetadata futureRecordMetadata) { if (nextRecordMetadata == null) nextRecordMetadata = futureRecordMetadata; else nextRecordMetadata.chain(futureRecordMetadata); } And ProducerBatch#tryAppendForSplit calls thunk.future.chain(future);{code} So, I think is ok, I will create a test case to verify it. > KafkaProducer flush behavior does not guarantee completed sends under record > batch splitting > -------------------------------------------------------------------------------------------- > > Key: KAFKA-9312 > URL: https://issues.apache.org/jira/browse/KAFKA-9312 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0 > Reporter: Lucas Bradstreet > Assignee: Jonathan Santilli > Priority: Major > > The KafkaProducer flush call guarantees that all records that have been sent > at time of the flush call will be either sent successfully or will result in > an error. > The KafkaProducer will split record batches upon receiving a > MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on > the accumulator checking incomplete sends that exist at the time of the flush > call. > {code:java} > public void awaitFlushCompletion() throws InterruptedException { > try { > for (ProducerBatch batch : this.incomplete.copyAll()) > batch.produceFuture.await(); > } finally { > this.flushesInProgress.decrementAndGet(); > } > }{code} > When large record batches are split, the batch producerFuture in question is > completed, and new batches added to the incomplete list of record batches. > This will break the flush guarantee as awaitFlushCompletion will finish > without awaiting the new split batches, and any pre-split batches being > awaited on above will have been completed. > This is demonstrated in a test case that can be found at > [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339] > This problem is likely present since record batch splitting was added as of > KAFKA-3995; KIP-126; 0.11. -- This message was sent by Atlassian Jira (v8.3.4#803005)