Niclas Lockner created KAFKA-12870:
--------------------------------------

             Summary: RecordAccumulator stuck in a flushing state
                 Key: KAFKA-12870
                 URL: https://issues.apache.org/jira/browse/KAFKA-12870
             Project: Kafka
          Issue Type: Bug
          Components: producer , streams
    Affects Versions: 2.8.0, 2.6.1
            Reporter: Niclas Lockner


After a Kafka Stream with exactly once enabled has performed its first commit, 
the RecordAccumulator within the stream's internal producer gets stuck in a 
state where all subsequent ProducerBatches that get allocated are immediately 
flushed instead of being held in memory until they expire, regardless of the 
stream's linger or batch size config.

This is reproduced in the example code found at <GitHub link to be added>, 
which can be run with ./gradlew run --args=<bootstrap servers>

The example has a producer that sends 1 record/sec to one topic, and a Kafka 
stream with EOS enabled that forwards the records from that topic to another 
topic with the configuration linger = 5 sec, commit interval = 10 sec.

 

The expected behavior when running the example is that the stream's 
ProducerBatches will expire (or get flushed because of the commit) every 5th 
second, and that the stream's producer will send a ProduceRequest every 5th 
second with an expired ProducerBatch that contains 5 records.

The actual behavior is that the ProducerBatch is made immediately available for 
the Sender, and the Sender sends one ProduceRequest for each record.

 

The example code contains a copy of the RecordAccumulator class (copied from 
kafka-clients 2.8.0) with some additional logging added to
 * RecordAccumulator#ready(Cluster, long)
 * RecordAccumulator#beginFlush()
 * RecordAccumulator#awaitFlushCompletion()

These log entries show
 * that the batches are considered sendable because a flush is in progress
 * that Sender.maybeSendAndPollTransactionalRequest() calls RecordAccumulator's 
beginFlush() without also calling awaitFlushCompletion(), and that this makes 
RecordAccumulator's flushesInProgress jump between 1-2 instead of the expected 
0-1.

 

This issue is not reproducible in version 2.3.1.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to