junrao commented on PR #20254:
URL: https://github.com/apache/kafka/pull/20254#issuecomment-3349188285

   @mjsax 
   
   > Expected behavior would be, that producer.flush() blocks, and eventual 
fails with a TimeoutException hitting delivery.timeout.ms.
   
   If a record fails to be delivered after delivery.timeout.ms, Sender calls 
failBatch(), which eventually calls ProduceRequestResult.done(). This unblocks 
Producer.flush(), but it won't fail with a TimeoutException since it doesn't 
check the error in ProduceRequestResult. Does KStreams depend on 
Producer.flush() failing? It seems that KStreams needs to check the future 
returned in the send() call to know if the record is sent successfully.
   
   >  Or is this by design, and if yes, why?), as there could also be the case 
of a single large message that cannot be split.
   
   We have the following code in Sender. It only tries to split the batch if it 
has more than 1 record. So, the splitting shouldn't occur forever. It does have 
the issue that when a split batch is created, it gets a new createMs, which 
inadvently extends the timeout.
   
   ```
       private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionResponse response, long correlationId,
                                  long now, Map<TopicPartition, 
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo) {
           Errors error = response.error;
   
           if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && 
!batch.isDone() &&
                   (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || 
batch.isCompressed())) {
               ...
               this.accumulator.splitAndReenqueue(batch);
   ```
   
   > I did set a breakpoint somewhere in the producer to see what it's doing, 
and while it was retrying the send internally it did unblock flush() before the 
send was done, and producer.flush() call did return.
   
   Do you have a test that can reproduce this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to