[
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Bradstreet updated KAFKA-9312:
------------------------------------
Description:
The KafkaProducer flush call guarantees that all records that have been sent at
time of the flush call will be either sent successfully or will result in an
error.
The KafkaProducer will split record batches upon receiving a MESSAGE_TOO_LARGE
error from the broker. However the flush behavior relies on the accumulator
checking incomplete sends that exist at the time of the flush call.
{code:java}
public void awaitFlushCompletion() throws InterruptedException {
try {
for (ProducerBatch batch : this.incomplete.copyAll())
batch.produceFuture.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
}{code}
When large record batches are split, the batch producerFuture in question is
completed, and new batches added to the incomplete list of record batches. This
will break the flush guarantee as awaitFlushCompletion will finish without
awaiting the new split batches, and any pre-split batches being awaited on
above will have been completed.
This is demonstrated in a test case that can be found at
[https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
was:
The KafkaProducer flush call guarantees that all records that have been sent at
time of the flush call will be either sent successfully or will result in an
error.
The KafkaProducer will split record batches upon receiving a MESSAGE_TOO_LARGE
error from the broker. However the flush behavior relies on the accumulator
checking incomplete sends that exist at the time of the flush call.
{code:java}
public void awaitFlushCompletion() throws InterruptedException {
try {
for (ProducerBatch batch : this.incomplete.copyAll())
batch.produceFuture.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
}{code}
When large record batches are split, the batch producerFuture in question is
completed, and new batches added to the incomplete list of record batches. This
will break the flush guarantee as awaitFlushCompletion will finish without
awaiting the corresponding batches.
This is demonstrated in a test case that can be found at
[https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> KafkaProducer flush behavior does not guarantee completed sends under record
> batch splitting
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-9312
> URL: https://issues.apache.org/jira/browse/KAFKA-9312
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 2.1.0, 2.2.0, 2.3.0, 2.4.0
> Reporter: Lucas Bradstreet
> Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent
> at time of the flush call will be either sent successfully or will result in
> an error.
> The KafkaProducer will split record batches upon receiving a
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on
> the accumulator checking incomplete sends that exist at the time of the flush
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
> try {
> for (ProducerBatch batch : this.incomplete.copyAll())
> batch.produceFuture.await();
> } finally {
> this.flushesInProgress.decrementAndGet();
> }
> }{code}
> When large record batches are split, the batch producerFuture in question is
> completed, and new batches added to the incomplete list of record batches.
> This will break the flush guarantee as awaitFlushCompletion will finish
> without awaiting the new split batches, and any pre-split batches being
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)