This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cce63274f2f KAFKA-16086: Fix memory leak in RocksDBStore (#15135)
cce63274f2f is described below

commit cce63274f2fdf9a4db014e2bae8019677b2cd7b2
Author: Nick Telford <nick.telf...@gmail.com>
AuthorDate: Fri Jan 5 14:40:00 2024 +0000

    KAFKA-16086: Fix memory leak in RocksDBStore (#15135)
    
    We allocate an `Options` in order to list column families while opening
    the `RocksDBStore`, but never explicitly `close()` it.
    
    `Options` is a RocksDB native object, which needs to be explicitly
    closed to free the resources it allocates in native memory.
    
    Failing to close this causes a memory leak when repeatedly
    opening/closing stores.
    
    It's an `AutoCloseable`, and all usage of it is confined to the
    surrounding `try` block, so we can just hook it out to the `try` to
    auto-close it when done.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../java/org/apache/kafka/streams/state/internals/RocksDBStore.java    | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index b952f1b9950..3752e07f89f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -308,8 +308,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         allDescriptors.add(defaultColumnFamilyDescriptor);
         allDescriptors.addAll(extraDescriptors);
 
-        try {
-            final Options options = new Options(dbOptions, 
defaultColumnFamilyDescriptor.getOptions());
+        try (final Options options = new Options(dbOptions, 
defaultColumnFamilyDescriptor.getOptions())) {
             final List<byte[]> allExisting = 
RocksDB.listColumnFamilies(options, absolutePath);
 
             final List<ColumnFamilyDescriptor> existingDescriptors = new 
LinkedList<>();

Reply via email to