junrao commented on PR #20254:
URL: https://github.com/apache/kafka/pull/20254#issuecomment-3152755901

   @mjsax : Hmm, the splitting code has some logic to chain the response 
futures. When we append a record to a new batch after a split, we create a new 
future for the record and chain it to the old future.
   
   ```
       private boolean tryAppendForSplit(long timestamp, ByteBuffer key, 
ByteBuffer value, Header[] headers, Thunk thunk) {
   ...
   
               FutureRecordMetadata future = new 
FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                      timestamp,
                                                                      key == 
null ? -1 : key.remaining(),
                                                                      value == 
null ? -1 : value.remaining(),
                                                                      
Time.SYSTEM);
               // Chain the future to the original thunk.
               thunk.future.chain(future);
   ```
   
   `FutureRecordMetadata.chain()` then adds the new future as a dependency.
   ```
       void chain(FutureRecordMetadata futureRecordMetadata) {
           if (nextRecordMetadata == null)
               nextRecordMetadata = futureRecordMetadata;
           else
               nextRecordMetadata.chain(futureRecordMetadata);
       }
   
   ```
   
   `FutureRecordMetadata.get()` will then wait until all chained response 
futures complete.
   ```
       public RecordMetadata get() throws InterruptedException, 
ExecutionException {
           this.result.await();
           if (nextRecordMetadata != null)
               return nextRecordMetadata.get();
           return valueOrError();
       }
   ```
   
   So, a pending `flush()` waiting for an original batch is not supposed to 
complete until all split batches have completed. Did you actually observe that 
`flush()` returns early?
   
   @lianetm : Currently, `KafkaProducer.doSend()` only rejects a record if its 
size is > max.request.size. It's ok if a record has a size larger than 
batch.size. We will just create a larger new batch to accommodate for it. So, 
during split, it's reasonable to follow the same approach to allow the first 
record in a new batch. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to