[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet updated KAFKA-9312:
------------------------------------
    Description: 
The KafkaProducer flush call guarantees that all records that have been sent at 
time of the flush call will be either sent successfully or will result in an 
error.

The KafkaProducer will split record batches upon receiving a MESSAGE_TOO_LARGE 
error from the broker. However the flush behavior relies on the accumulator 
checking incomplete sends that exist at the time of the flush call.
{code:java}
public void awaitFlushCompletion() throws InterruptedException {
    try {
        for (ProducerBatch batch : this.incomplete.copyAll())
            batch.produceFuture.await();
    } finally {
        this.flushesInProgress.decrementAndGet();
    }
}{code}
When large record batches are split, the batch producerFuture in question is 
completed, and new batches added to the incomplete list of record batches. This 
will break the flush guarantee as awaitFlushCompletion will finish without 
awaiting the new split batches, and any pre-split batches being awaited on 
above will have been completed.

This is demonstrated in a test case that can be found at 
[https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]

  was:
The KafkaProducer flush call guarantees that all records that have been sent at 
time of the flush call will be either sent successfully or will result in an 
error.

The KafkaProducer will split record batches upon receiving a MESSAGE_TOO_LARGE 
error from the broker. However the flush behavior relies on the accumulator 
checking incomplete sends that exist at the time of the flush call.
{code:java}
public void awaitFlushCompletion() throws InterruptedException {
    try {
        for (ProducerBatch batch : this.incomplete.copyAll())
            batch.produceFuture.await();
    } finally {
        this.flushesInProgress.decrementAndGet();
    }
}{code}
When large record batches are split, the batch producerFuture in question is 
completed, and new batches added to the incomplete list of record batches. This 
will break the flush guarantee as awaitFlushCompletion will finish without 
awaiting the corresponding batches.

This is demonstrated in a test case that can be found at 
[https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]


> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9312
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9312
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.1.0, 2.2.0, 2.3.0, 2.4.0
>            Reporter: Lucas Bradstreet
>            Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
>     try {
>         for (ProducerBatch batch : this.incomplete.copyAll())
>             batch.produceFuture.await();
>     } finally {
>         this.flushesInProgress.decrementAndGet();
>     }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to