junrao commented on PR #20254: URL: https://github.com/apache/kafka/pull/20254#issuecomment-3152755901
@mjsax : Hmm, the splitting code has some logic to chain the response futures. When we append a record to a new batch after a split, we create a new future for the record and chain it to the old future. ``` private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) { ... FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, key == null ? -1 : key.remaining(), value == null ? -1 : value.remaining(), Time.SYSTEM); // Chain the future to the original thunk. thunk.future.chain(future); ``` `FutureRecordMetadata.chain()` then adds the new future as a dependency. ``` void chain(FutureRecordMetadata futureRecordMetadata) { if (nextRecordMetadata == null) nextRecordMetadata = futureRecordMetadata; else nextRecordMetadata.chain(futureRecordMetadata); } ``` `FutureRecordMetadata.get()` will then wait until all chained response futures complete. ``` public RecordMetadata get() throws InterruptedException, ExecutionException { this.result.await(); if (nextRecordMetadata != null) return nextRecordMetadata.get(); return valueOrError(); } ``` So, a pending `flush()` waiting for an original batch is not supposed to complete until all split batches have completed. Did you actually observe that `flush()` returns early? @lianetm : Currently, `KafkaProducer.doSend()` only rejects a record if its size is > max.request.size. It's ok if a record has a size larger than batch.size. We will just create a larger new batch to accommodate for it. So, during split, it's reasonable to follow the same approach to allow the first record in a new batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org