This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee565f5f6b KAFKA-13939: Only track dirty keys if logging is enabled. 
(#12263)
ee565f5f6b is described below

commit ee565f5f6b97c84d4f7f895fcb79188822284414
Author: jnewhouse <jnewho...@quantcast.com>
AuthorDate: Thu Jun 16 14:27:38 2022 -0700

    KAFKA-13939: Only track dirty keys if logging is enabled. (#12263)
    
    InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys that have been seen 
in order to log them for durability. This set is never used nor cleared if 
logging is not enabled. Having it be populated creates a memory leak. This 
change stops populating the set if logging is not enabled.
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Kvicii 
<42023367+kvi...@users.noreply.github.com>, Guozhang Wang <wangg...@gmail.com>
---
 .../state/internals/InMemoryTimeOrderedKeyValueBuffer.java        | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 5894023bbe..5403f9e703 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -423,7 +423,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
                 delegate.remove();
                 index.remove(next.getKey().key());
 
-                dirtyKeys.add(next.getKey().key());
+                if (loggingEnabled) {
+                    dirtyKeys.add(next.getKey().key());
+                }
 
                 memBufferSize -= computeRecordSize(next.getKey().key(), 
bufferValue);
 
@@ -497,7 +499,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
             serializedKey,
             new BufferValue(serializedPriorValue, serialChange.oldValue, 
serialChange.newValue, recordContext)
         );
-        dirtyKeys.add(serializedKey);
+        if (loggingEnabled) {
+            dirtyKeys.add(serializedKey);
+        }
         updateBufferMetrics();
     }
 

Reply via email to