Kyle Tinker created KAFKA-6512:
----------------------------------

             Summary: Java Producer: Excessive memory usage with compression 
enabled
                 Key: KAFKA-6512
                 URL: https://issues.apache.org/jira/browse/KAFKA-6512
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 1.0.0
         Environment: Windows 10
            Reporter: Kyle Tinker
         Attachments: KafkaSender.java

h2. User Story

As a user of the Java producer, I want a predictable memory usage for the Kafka 
client so that I can ensure that my system is sized appropriately and will be 
stable even under heavy usage.

As a user of the Java producer, I want a smaller memory footprint so that my 
systems don't consume as many resources.
h2. Acceptance Criteria
 * Enabling Compression in Kafka should not significantly increase the memory 
usage of Kafka
 * The memory usage of Kafka's Java Producer should be roughly in line with the 
buffer size (buffer.memory) and the number of producers declared.

h2. Additional Information

I've observed high memory usage in the producer when enabling compression (gzip 
or lz4).  I don't observe the behavior with compression off, but with it on 
I'll run out of heap (2GB).  Using a Java profiler, I see the data is in the 
KafkaLZ4BlockOutputStream (or related class for gzip).   I see that 
MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but 
is not successful.  I'm most likely network bottlenecked, so I expect the 
producer buffers to be full while the job is running and potentially a lot of 
unacknowledged records.

I've tried using the default buffer.memory with 20 producers (across 20 
threads) and sending data as quickly as I can.  I've also tried 1MB of 
buffer.memory, which seemed to reduce memory consumption but I could still run 
OOM in certain cases.  I have max.in.flight.requests.per.connection set to 1.  
In short, I should only have ~20 MB (20* 1MB) of data in buffers, but I can 
easily exhaust 2000 MB used by Kafka.

In looking at the code more, it looks like the KafkaLZ4BlockOutputStream 
doesn't clear the compressedBuffer or buffer when close() is called.  In my 
heap dump, both of those are ~65k size each, meaning that each batch is taking 
up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 and 
messages are 1k each until the batch fills).

Kafka tries to manage memory usage by calling 
MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release 
resources required for record appends (e.g. compression buffers)".  However, 
this method doesn't actually clear those buffers because 
KafkaLZ4BlockOutputStream.close() only writes the block and end mark and closes 
the output stream.  It doesn't actually clear the buffer and compressedBuffer 
in KafkaLZ4BlockOutputStream.  Those stay allocated in RAM until the block is 
acknowledged by the broker, processed in Sender:handleProduceResponse(), and 
the batch is deallocated.  This memory usage therefore increases, possibly 
without bound.  In my test program, the program died with approximately 345 
unprocessed batches per producer (20 producers), despite having 
max.in.flight.requests.per.connection=1.
h2. Steps to Reproduce
 # Create a topic test with plenty of storage
 # Use a connection with a very fast upload pipe and limited download.  This 
allows the outbound data to go out, but acknowledgements to be delayed flowing 
in.
 # Download KafkaSender.java (attached to this ticket)
 # Set line 17 to reference your Kafka broker
 # Run the program with a 1GB Xmx value

h2. Possible solutions

There are a few possible optimizations I can think of:
 # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as 
non-final and null them in the close() method
 # We could declare the MemoryRecordsBuilder.appendStream non-final and null it 
in the closeForRecordAppends() method
 # We could have the ProducerBatch discard the recordsBuilder in 
closeForRecordAppends(), however, this is likely a bad idea because the 
recordsBuilder contains significant metadata that is likely needed after the 
stream is closed.  It is also final.
 # We could try to limit the number of non-acknowledged batches in flight.  
This would bound the maximum memory usage but may negatively impact performance.

 

Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.

Fix #2 would improve all algorithms, compression and otherwise.  Of the 3 
proposed here, it seems the best.  This would also involve having to check 
appendStreamIsClosed in every usage of appendStream within MemoryRecordsBuilder 
to avoid NPE's.

Fix #4 is likely necessary if we want to bound the maximum memory usage of 
Kafka.  Removing the buffers in Fix 1 or 2 will reduce the memory usage by 
~90%, but theoretically there is still no limit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to