This is an automated email from the ASF dual-hosted git repository.

Aias00 pushed a commit to branch 2.0.0
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git

commit c23be1b5b7ebf38319a42e9ba466b6832f96f73d
Author: liuhy <[email protected]>
AuthorDate: Fri Jun 5 14:25:43 2026 +0800

    fix(arrow): fix RootAllocator native memory leaks in Arrow serialization
    
    - CollectRep: bind allocator lifecycle to MetricsData, close on 
MetricsData.close()
    - ArrowUtil: create per-root allocator in deserializeMetricsData so each
      MetricsData can be independently closed without corrupting others
    - KafkaMetricsDataDeserializer: pass allocator to MetricsData instead of
      leaking it on every deserialization call
    - ArrowUtilTest: update test to match new deserializeMultipleRoots API
    
    RootAllocator allocates off-heap (native) memory via Arrow. Previously the
    allocator was never closed on success paths, causing unbounded native memory
    growth with every metrics collection cycle.
    
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
---
 .../common/entity/message/CollectRep.java          | 26 ++++++++++++---
 .../serialize/KafkaMetricsDataDeserializer.java    | 18 ++++++++--
 .../apache/hertzbeat/common/util/ArrowUtil.java    | 39 ++++++++++++++++------
 .../hertzbeat/common/util/ArrowUtilTest.java       |  6 ++--
 4 files changed, 69 insertions(+), 20 deletions(-)

diff --git 
a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java
 
b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java
index 03d2821c5d..93a3d51e7c 100644
--- 
a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java
+++ 
b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java
@@ -125,12 +125,24 @@ public final class CollectRep {
          */
         private ArrowTable table;
 
+        private BufferAllocator allocator;
+
         public MetricsData(ArrowTable table) {
+            this(table, null);
+        }
+
+        public MetricsData(ArrowTable table, BufferAllocator allocator) {
             this.table = table;
+            this.allocator = allocator;
         }
-        
+
         public MetricsData(VectorSchemaRoot vectorSchemaRoot) {
+            this(vectorSchemaRoot, null);
+        }
+
+        public MetricsData(VectorSchemaRoot vectorSchemaRoot, BufferAllocator 
allocator) {
             this.table = new ArrowTable(vectorSchemaRoot);
+            this.allocator = allocator;
         }
 
         public static Builder newBuilder() {
@@ -313,6 +325,9 @@ public final class CollectRep {
             if (table != null) {
                 table.close();
             }
+            if (allocator != null) {
+                allocator.close();
+            }
         }
 
         // Builder remains mostly the same, but build() method changes
@@ -427,7 +442,7 @@ public final class CollectRep {
                     // Create Schema with metadata
                     Schema schema = new Schema(arrowFields, metadata);
                     VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator);
-                    
+
                     try {
                         root.allocateNew();
                         int rowCount = values.size();
@@ -451,12 +466,13 @@ public final class CollectRep {
                             }
                             vector.setValueCount(rowCount);
                         }
-                        return new MetricsData(new ArrowTable(root));
+                        // ArrowTable takes ownership of root's vectors;
+                        // allocator lifecycle is bound to the returned 
MetricsData
+                        return new MetricsData(new ArrowTable(root), 
allocator);
                     } catch (Exception e1) {
                         log.error(e1.getMessage(), e1);
-                        throw e1;
-                    } finally {
                         root.close();
+                        throw e1;
                     }
                 } catch (Exception e) {
                     log.error(e.getMessage(), e);
diff --git 
a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java
 
b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java
index 3d4b95b4fe..3d7c517962 100644
--- 
a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java
+++ 
b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java
@@ -41,12 +41,24 @@ public class KafkaMetricsDataDeserializer implements 
Deserializer<CollectRep.Met
 
     @Override
     public CollectRep.MetricsData deserialize(String s, byte[] bytes){
-        try (ByteArrayInputStream in = new ByteArrayInputStream(bytes);
-             ArrowStreamReader reader = new 
ArrowStreamReader(Channels.newChannel(in), new RootAllocator())) {
+        RootAllocator allocator = new RootAllocator();
+        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+        ArrowStreamReader reader = new 
ArrowStreamReader(Channels.newChannel(in), allocator);
+        try {
             VectorSchemaRoot root = reader.getVectorSchemaRoot();
             reader.loadNextBatch();
-            return new CollectRep.MetricsData(new ArrowTable(root));
+            // The MetricsData takes ownership of the allocator and will close 
it when needed.
+            // The reader's lifecycle is tied to the root - closing the 
allocator will
+            // reclaim all memory including the reader's buffers.
+            return new CollectRep.MetricsData(new ArrowTable(root), allocator);
         } catch (IOException e) {
+            // On error, close the allocator to release all resources
+            try {
+                reader.close();
+            } catch (IOException closeEx) {
+                // Ignore close exception
+            }
+            allocator.close();
             throw new RuntimeException("Failed to deserialize Arrow table", e);
         }
     }
diff --git 
a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
 
b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
index 54b5143187..9089a8b918 100644
--- 
a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
+++ 
b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
@@ -18,6 +18,7 @@
 package org.apache.hertzbeat.common.util;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
@@ -88,12 +89,11 @@ public final class ArrowUtil {
      * @return List of deserialized VectorSchemaRoot objects
      * @throws RuntimeException if deserialization fails
      */
-    public static List<VectorSchemaRoot> deserializeMultipleRoots(byte[] data) 
{
+    public static List<VectorSchemaRoot> deserializeMultipleRoots(byte[] data, 
BufferAllocator allocator) {
         List<VectorSchemaRoot> roots = new ArrayList<>();
         ByteBuffer buffer = ByteBuffer.wrap(data);
         try {
             int rootCount = buffer.getInt();
-            RootAllocator allocator = new RootAllocator();
 
             for (int i = 0; i < rootCount; i++) {
                 int length = buffer.getInt();
@@ -102,6 +102,8 @@ public final class ArrowUtil {
                 ByteArrayInputStream rootIn = new ByteArrayInputStream(data, 
buffer.position(), length);
                 buffer.position(buffer.position() + length);
 
+                // Note: reader lifecycle is tied to the returned 
VectorSchemaRoot —
+                // closing the reader would invalidate the root's vectors.
                 ArrowStreamReader reader = new ArrowStreamReader(
                         Channels.newChannel(rootIn),
                         allocator);
@@ -131,17 +133,34 @@ public final class ArrowUtil {
         if (data == null || data.length == 0) {
             return new ArrayList<>();
         }
-        List<VectorSchemaRoot> roots = deserializeMultipleRoots(data);
-        List<CollectRep.MetricsData> metricsDataList = new 
ArrayList<>(roots.size());
+        List<CollectRep.MetricsData> metricsDataList = new ArrayList<>();
+        ByteBuffer buffer = ByteBuffer.wrap(data);
         try {
-            for (VectorSchemaRoot root : roots) {
-                if (root != null) {
-                    CollectRep.MetricsData metricsData = new 
CollectRep.MetricsData(root);
-                    metricsDataList.add(metricsData);
+            int rootCount = buffer.getInt();
+            for (int i = 0; i < rootCount; i++) {
+                int length = buffer.getInt();
+                ByteArrayInputStream rootIn = new ByteArrayInputStream(data, 
buffer.position(), length);
+                buffer.position(buffer.position() + length);
+                // Each MetricsData gets its own allocator so they can be 
closed independently.
+                // The reader is intentionally not closed here — its lifecycle 
is tied to the root,
+                // and closing it would invalidate the root's vectors. The 
allocator.close() in
+                // MetricsData.close() will reclaim all memory allocated by 
both the reader and root.
+                RootAllocator allocator = new RootAllocator();
+                ArrowStreamReader reader = new 
ArrowStreamReader(Channels.newChannel(rootIn), allocator);
+                VectorSchemaRoot root = reader.getVectorSchemaRoot();
+                reader.loadNextBatch();
+                metricsDataList.add(new CollectRep.MetricsData(root, 
allocator));
+            }
+        } catch (IOException e) {
+            // Clean up any MetricsData objects created before the failure to 
prevent memory leaks
+            for (CollectRep.MetricsData metricsData : metricsDataList) {
+                try {
+                    metricsData.close();
+                } catch (Exception closeEx) {
+                    log.warn("Failed to close MetricsData during error 
cleanup: {}", closeEx.getMessage());
                 }
             }
-        } finally {
-            roots.forEach(VectorSchemaRoot::close);
+            throw new RuntimeException("Failed to deserialize metrics data", 
e);
         }
         return metricsDataList;
     }
diff --git 
a/hertzbeat-common-core/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
 
b/hertzbeat-common-core/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
index beb0d2cd9b..d74fe2b29a 100644
--- 
a/hertzbeat-common-core/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
+++ 
b/hertzbeat-common-core/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
@@ -67,8 +67,9 @@ class ArrowUtilTest {
 
         // Serialize
         byte[] data = ArrowUtil.serializeMultipleRoots(roots);
-        // Deserialize
-        List<VectorSchemaRoot> deserializedRoots = 
ArrowUtil.deserializeMultipleRoots(data);
+        // Deserialize — pass a fresh allocator for deserialization
+        RootAllocator deserializeAllocator = new RootAllocator();
+        List<VectorSchemaRoot> deserializedRoots = 
ArrowUtil.deserializeMultipleRoots(data, deserializeAllocator);
 
         Assertions.assertEquals(2, deserializedRoots.size());
         VectorSchemaRoot resultRoot1 = deserializedRoots.get(0);
@@ -81,6 +82,7 @@ class ArrowUtilTest {
         // Cleanup
         roots.forEach(VectorSchemaRoot::close);
         deserializedRoots.forEach(VectorSchemaRoot::close);
+        deserializeAllocator.close();
         allocator.close();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to