Niclas Lockner created KAFKA-12870: -------------------------------------- Summary: RecordAccumulator stuck in a flushing state Key: KAFKA-12870 URL: https://issues.apache.org/jira/browse/KAFKA-12870 Project: Kafka Issue Type: Bug Components: producer , streams Affects Versions: 2.8.0, 2.6.1 Reporter: Niclas Lockner
After a Kafka Stream with exactly once enabled has performed its first commit, the RecordAccumulator within the stream's internal producer gets stuck in a state where all subsequent ProducerBatches that get allocated are immediately flushed instead of being held in memory until they expire, regardless of the stream's linger or batch size config. This is reproduced in the example code found at <GitHub link to be added>, which can be run with ./gradlew run --args=<bootstrap servers> The example has a producer that sends 1 record/sec to one topic, and a Kafka stream with EOS enabled that forwards the records from that topic to another topic with the configuration linger = 5 sec, commit interval = 10 sec. The expected behavior when running the example is that the stream's ProducerBatches will expire (or get flushed because of the commit) every 5th second, and that the stream's producer will send a ProduceRequest every 5th second with an expired ProducerBatch that contains 5 records. The actual behavior is that the ProducerBatch is made immediately available for the Sender, and the Sender sends one ProduceRequest for each record. The example code contains a copy of the RecordAccumulator class (copied from kafka-clients 2.8.0) with some additional logging added to * RecordAccumulator#ready(Cluster, long) * RecordAccumulator#beginFlush() * RecordAccumulator#awaitFlushCompletion() These log entries show * that the batches are considered sendable because a flush is in progress * that Sender.maybeSendAndPollTransactionalRequest() calls RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), and that this makes RecordAccumulator's flushesInProgress jump between 1-2 instead of the expected 0-1. This issue is not reproducible in version 2.3.1. -- This message was sent by Atlassian Jira (v8.3.4#803005)