This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1067cc3  KAFKA-6512: Discard references to buffers used for 
compression (#4570)
1067cc3 is described below

commit 1067cc3422e6f2731ae57df9f01e84f2242a60be
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Thu Feb 15 17:36:44 2018 +0000

    KAFKA-6512: Discard references to buffers used for compression (#4570)
    
    ProducerBatch retains references to MemoryRecordsBuilder and cannot be 
freed until acks are received. Removing references to buffers used for 
compression after records are built will enable these to be garbage collected 
sooner, reducing the risk of OOM.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson 
<ja...@confluent.io>, Lothsahn <loths...@gmail.com>
---
 .../common/record/KafkaLZ4BlockOutputStream.java   | 40 ++++++++++++++--------
 .../kafka/common/record/MemoryRecordsBuilder.java  | 21 ++++++++----
 .../common/record/MemoryRecordsBuilderTest.java    | 34 ++++++++++++++++++
 3 files changed, 73 insertions(+), 22 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 8cfc37b..591ab16 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -34,7 +33,7 @@ import net.jpountz.xxhash.XXHashFactory;
  *
  * This class is not thread-safe.
  */
-public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+public final class KafkaLZ4BlockOutputStream extends OutputStream {
 
     public static final int MAGIC = 0x184D2204;
     public static final int LZ4_MAX_HEADER_LENGTH = 19;
@@ -52,9 +51,10 @@ public final class KafkaLZ4BlockOutputStream extends 
FilterOutputStream {
     private final boolean useBrokenFlagDescriptorChecksum;
     private final FLG flg;
     private final BD bd;
-    private final byte[] buffer;
-    private final byte[] compressedBuffer;
     private final int maxBlockSize;
+    private OutputStream out;
+    private byte[] buffer;
+    private byte[] compressedBuffer;
     private int bufferOffset;
     private boolean finished;
 
@@ -71,7 +71,7 @@ public final class KafkaLZ4BlockOutputStream extends 
FilterOutputStream {
      * @throws IOException
      */
     public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean 
blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
-        super(out);
+        this.out = out;
         compressor = LZ4Factory.fastestInstance().fastCompressor();
         checksum = XXHashFactory.fastestInstance().hash32();
         this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
@@ -204,7 +204,6 @@ public final class KafkaLZ4BlockOutputStream extends 
FilterOutputStream {
     private void writeEndMark() throws IOException {
         ByteUtils.writeUnsignedIntLE(out, 0);
         // TODO implement content checksum, update flg.validate()
-        finished = true;
     }
 
     @Override
@@ -259,15 +258,26 @@ public final class KafkaLZ4BlockOutputStream extends 
FilterOutputStream {
 
     @Override
     public void close() throws IOException {
-        if (!finished) {
-            // basically flush the buffer writing the last block
-            writeBlock();
-            // write the end block and finish the stream
-            writeEndMark();
-        }
-        if (out != null) {
-            out.close();
-            out = null;
+        try {
+            if (!finished) {
+                // basically flush the buffer writing the last block
+                writeBlock();
+                // write the end block
+                writeEndMark();
+            }
+        } finally {
+            try {
+                if (out != null) {
+                    try (OutputStream outStream = out) {
+                        outStream.flush();
+                    }
+                }
+            } finally {
+                out = null;
+                buffer = null;
+                compressedBuffer = null;
+                finished = true;
+            }
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a9b57ac..6f6404f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import static org.apache.kafka.common.utils.Utils.wrapNullable;
@@ -38,11 +39,15 @@ import static 
org.apache.kafka.common.utils.Utils.wrapNullable;
  */
 public class MemoryRecordsBuilder {
     private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+    private static final DataOutputStream CLOSED_STREAM = new 
DataOutputStream(new OutputStream() {
+        @Override
+        public void write(int b) throws IOException {
+            throw new IllegalStateException("MemoryRecordsBuilder is closed 
for record appends");
+        }
+    });
 
     private final TimestampType timestampType;
     private final CompressionType compressionType;
-    // Used to append records, may compress data on the fly
-    private final DataOutputStream appendStream;
     // Used to hold a reference to the underlying ByteBuffer so that we can 
write the record batch header and access
     // the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if 
the existing one is not large enough,
     // so it's not safe to hold a direct reference to the underlying 
ByteBuffer.
@@ -60,7 +65,8 @@ public class MemoryRecordsBuilder {
     // from previous batches before appending any records.
     private float estimatedCompressionRatio = 1.0F;
 
-    private boolean appendStreamIsClosed = false;
+    // Used to append records, may compress data on the fly
+    private DataOutputStream appendStream;
     private boolean isTransactional;
     private long producerId;
     private short producerEpoch;
@@ -265,12 +271,13 @@ public class MemoryRecordsBuilder {
      * possible to update the RecordBatch header.
      */
     public void closeForRecordAppends() {
-        if (!appendStreamIsClosed) {
+        if (appendStream != CLOSED_STREAM) {
             try {
                 appendStream.close();
-                appendStreamIsClosed = true;
             } catch (IOException e) {
                 throw new KafkaException(e);
+            } finally {
+                appendStream = CLOSED_STREAM;
             }
         }
     }
@@ -663,7 +670,7 @@ public class MemoryRecordsBuilder {
     }
 
     private void ensureOpenForRecordAppend() {
-        if (appendStreamIsClosed)
+        if (appendStream == CLOSED_STREAM)
             throw new IllegalStateException("Tried to append a record, but 
MemoryRecordsBuilder is closed for record appends");
     }
 
@@ -738,7 +745,7 @@ public class MemoryRecordsBuilder {
     public boolean isFull() {
         // note that the write limit is respected only after the first record 
is added which ensures we can always
         // create non-empty batches (this is used to disable batching when the 
producer's batch size is set to 0).
-        return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit 
<= estimatedBytesWritten());
+        return appendStream == CLOSED_STREAM || (this.numRecords > 0 && 
this.writeLimit <= estimatedBytesWritten());
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index c713d17..a90fb29 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -644,6 +645,39 @@ public class MemoryRecordsBuilderTest {
         return values;
     }
 
+    @Test
+    public void testBuffersDereferencedOnClose() {
+        Runtime runtime = Runtime.getRuntime();
+        int payloadLen = 1024 * 1024;
+        ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2);
+        byte[] key = new byte[0];
+        byte[] value = new byte[payloadLen];
+        new Random().nextBytes(value); // Use random payload so that 
compressed buffer is large
+        List<MemoryRecordsBuilder> builders = new ArrayList<>(100);
+        long startMem = 0;
+        long memUsed = 0;
+        int iterations =  0;
+        while (iterations++ < 100) {
+            buffer.rewind();
+            MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
RecordBatch.MAGIC_VALUE_V2, compressionType,
+                    TimestampType.CREATE_TIME, 0L, 0L, 
RecordBatch.NO_PRODUCER_ID,
+                    RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 
false, false,
+                    RecordBatch.NO_PARTITION_LEADER_EPOCH, 0);
+            builder.append(1L, new byte[0], value);
+            builder.build();
+            builders.add(builder);
+
+            System.gc();
+            memUsed = runtime.totalMemory() - runtime.freeMemory() - startMem;
+            // Ignore memory usage during initialization
+            if (iterations == 2)
+                startMem = memUsed;
+            else if (iterations > 2 && memUsed < (iterations - 2) * 1024)
+                break;
+        }
+        assertTrue("Memory usage too high: " + memUsed, iterations < 100);
+    }
+
     private void verifyRecordsProcessingStats(RecordsProcessingStats 
processingStats, int numRecords,
             int numRecordsConverted, long finalBytes, long preConvertedBytes) {
         assertNotNull("Records processing info is null", processingStats);

-- 
To stop receiving notification emails like this one, please contact
rsiva...@apache.org.

Reply via email to