[jira] [Commented] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365989#comment-16365989
 ] 

ASF GitHub Bot commented on KAFKA-6512:
---

rajinisivaram closed pull request #4570: KAFKA-6512: Discard references to 
buffers used for compression
URL: https://github.com/apache/kafka/pull/4570
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 8cfc37be826..591ab169364 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -34,7 +33,7 @@
  *
  * This class is not thread-safe.
  */
-public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+public final class KafkaLZ4BlockOutputStream extends OutputStream {
 
 public static final int MAGIC = 0x184D2204;
 public static final int LZ4_MAX_HEADER_LENGTH = 19;
@@ -52,9 +51,10 @@
 private final boolean useBrokenFlagDescriptorChecksum;
 private final FLG flg;
 private final BD bd;
-private final byte[] buffer;
-private final byte[] compressedBuffer;
 private final int maxBlockSize;
+private OutputStream out;
+private byte[] buffer;
+private byte[] compressedBuffer;
 private int bufferOffset;
 private boolean finished;
 
@@ -71,7 +71,7 @@
  * @throws IOException
  */
 public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean 
blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
-super(out);
+this.out = out;
 compressor = LZ4Factory.fastestInstance().fastCompressor();
 checksum = XXHashFactory.fastestInstance().hash32();
 this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
@@ -204,7 +204,6 @@ private void writeBlock() throws IOException {
 private void writeEndMark() throws IOException {
 ByteUtils.writeUnsignedIntLE(out, 0);
 // TODO implement content checksum, update flg.validate()
-finished = true;
 }
 
 @Override
@@ -259,15 +258,26 @@ private void ensureNotFinished() {
 
 @Override
 public void close() throws IOException {
-if (!finished) {
-// basically flush the buffer writing the last block
-writeBlock();
-// write the end block and finish the stream
-writeEndMark();
-}
-if (out != null) {
-out.close();
-out = null;
+try {
+if (!finished) {
+// basically flush the buffer writing the last block
+writeBlock();
+// write the end block
+writeEndMark();
+}
+} finally {
+try {
+if (out != null) {
+try (OutputStream outStream = out) {
+outStream.flush();
+}
+}
+} finally {
+out = null;
+buffer = null;
+compressedBuffer = null;
+finished = true;
+}
 }
 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a9b57ac22df..6f6404fa2d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -23,6 +23,7 @@
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import static org.apache.kafka.common.utils.Utils.wrapNullable;
@@ -38,11 +39,15 @@
  */
 public class MemoryRecordsBuilder {
 private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+private static final DataOutputStream CLOSED_STREAM = new 
DataOutputStream(new OutputStream() {
+@Override
+public void write(int b) throws IOException {
+throw new IllegalStateException("MemoryRecordsBuilder is closed 
for record appends");
+}
+});
 
 private final TimestampType timestampType;
 private final CompressionType compressionType;
-// Used to append records, may compress data on the fly
-private final 

[jira] [Commented] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363820#comment-16363820
 ] 

ASF GitHub Bot commented on KAFKA-6512:
---

rajinisivaram opened a new pull request #4570: KAFKA-6512: Discard references 
to buffers used for compression
URL: https://github.com/apache/kafka/pull/4570
 
 
   `ProducerBatch` retains references to `MemoryRecordsBuilder` and cannot be 
freed until acks are received. Removing references to buffers used for 
compression after records are built will enable these to be garbage collected 
sooner, reducing the risk of OOM.  
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Java Producer: Excessive memory usage with compression enabled
> --
>
> Key: KAFKA-6512
> URL: https://issues.apache.org/jira/browse/KAFKA-6512
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
> Environment: Windows 10
>Reporter: Kyle Tinker
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
> Attachments: KafkaSender.java
>
>
> h2. User Story
> As a user of the Java producer, I want a predictable memory usage for the 
> Kafka client so that I can ensure that my system is sized appropriately and 
> will be stable even under heavy usage.
> As a user of the Java producer, I want a smaller memory footprint so that my 
> systems don't consume as many resources.
> h2. Acceptance Criteria
>  * Enabling Compression in Kafka should not significantly increase the memory 
> usage of Kafka
>  * The memory usage of Kafka's Java Producer should be roughly in line with 
> the buffer size (buffer.memory) and the number of producers declared.
> h2. Additional Information
> I've observed high memory usage in the producer when enabling compression 
> (gzip or lz4).  I don't observe the behavior with compression off, but with 
> it on I'll run out of heap (2GB).  Using a Java profiler, I see the data is 
> in the KafkaLZ4BlockOutputStream (or related class for gzip).   I see that 
> MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but 
> is not successful.  I'm most likely network bottlenecked, so I expect the 
> producer buffers to be full while the job is running and potentially a lot of 
> unacknowledged records.
> I've tried using the default buffer.memory with 20 producers (across 20 
> threads) and sending data as quickly as I can.  I've also tried 1MB of 
> buffer.memory, which seemed to reduce memory consumption but I could still 
> run OOM in certain cases.  I have max.in.flight.requests.per.connection set 
> to 1.  In short, I should only have ~20 MB (20* 1MB) of data in buffers, but 
> I can easily exhaust 2000 MB used by Kafka.
> In looking at the code more, it looks like the KafkaLZ4BlockOutputStream 
> doesn't clear the compressedBuffer or buffer when close() is called.  In my 
> heap dump, both of those are ~65k size each, meaning that each batch is 
> taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 
> and messages are 1k each until the batch fills).
> Kafka tries to manage memory usage by calling 
> MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release 
> resources required for record appends (e.g. compression buffers)".  However, 
> this method doesn't actually clear those buffers because 
> KafkaLZ4BlockOutputStream.close() only writes the block and end mark and 
> closes the output stream.  It doesn't actually clear the buffer and 
> compressedBuffer in KafkaLZ4BlockOutputStream.  Those stay allocated in RAM 
> until the block is acknowledged by the broker, processed in 
> Sender:handleProduceResponse(), and the batch is deallocated.  This memory 
> usage therefore increases, possibly without bound.  In my test program, the 
> program died with approximately 345 unprocessed batches per producer (20 
> producers), despite having max.in.flight.requests.per.connection=1.
> h2. Steps to Reproduce
>  # Create a topic test with plenty of storage
>  # Use a connection with a very fast upload pipe and limited download.  This 
> allows the outbound data to go out, but acknowledgements to be delayed 
> flowing in.
>  # Download KafkaSender.java (attached to this ticket)
>  # Set line 17 to reference your Kafka broker
>  # Run the