This is an automated email from the ASF dual-hosted git repository. Aias00 pushed a commit to branch 2.0.0 in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
commit c23be1b5b7ebf38319a42e9ba466b6832f96f73d Author: liuhy <[email protected]> AuthorDate: Fri Jun 5 14:25:43 2026 +0800 fix(arrow): fix RootAllocator native memory leaks in Arrow serialization - CollectRep: bind allocator lifecycle to MetricsData, close on MetricsData.close() - ArrowUtil: create per-root allocator in deserializeMetricsData so each MetricsData can be independently closed without corrupting others - KafkaMetricsDataDeserializer: pass allocator to MetricsData instead of leaking it on every deserialization call - ArrowUtilTest: update test to match new deserializeMultipleRoots API RootAllocator allocates off-heap (native) memory via Arrow. Previously the allocator was never closed on success paths, causing unbounded native memory growth with every metrics collection cycle. Co-Authored-By: Claude Opus 4.8 <[email protected]> --- .../common/entity/message/CollectRep.java | 26 ++++++++++++--- .../serialize/KafkaMetricsDataDeserializer.java | 18 ++++++++-- .../apache/hertzbeat/common/util/ArrowUtil.java | 39 ++++++++++++++++------ .../hertzbeat/common/util/ArrowUtilTest.java | 6 ++-- 4 files changed, 69 insertions(+), 20 deletions(-) diff --git a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java index 03d2821c5d..93a3d51e7c 100644 --- a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java +++ b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/entity/message/CollectRep.java @@ -125,12 +125,24 @@ public final class CollectRep { */ private ArrowTable table; + private BufferAllocator allocator; + public MetricsData(ArrowTable table) { + this(table, null); + } + + public MetricsData(ArrowTable table, BufferAllocator allocator) { this.table = table; + this.allocator = allocator; } - + public MetricsData(VectorSchemaRoot vectorSchemaRoot) { + this(vectorSchemaRoot, null); + } + + public MetricsData(VectorSchemaRoot vectorSchemaRoot, BufferAllocator allocator) { this.table = new ArrowTable(vectorSchemaRoot); + this.allocator = allocator; } public static Builder newBuilder() { @@ -313,6 +325,9 @@ public final class CollectRep { if (table != null) { table.close(); } + if (allocator != null) { + allocator.close(); + } } // Builder remains mostly the same, but build() method changes @@ -427,7 +442,7 @@ public final class CollectRep { // Create Schema with metadata Schema schema = new Schema(arrowFields, metadata); VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); - + try { root.allocateNew(); int rowCount = values.size(); @@ -451,12 +466,13 @@ public final class CollectRep { } vector.setValueCount(rowCount); } - return new MetricsData(new ArrowTable(root)); + // ArrowTable takes ownership of root's vectors; + // allocator lifecycle is bound to the returned MetricsData + return new MetricsData(new ArrowTable(root), allocator); } catch (Exception e1) { log.error(e1.getMessage(), e1); - throw e1; - } finally { root.close(); + throw e1; } } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java index 3d4b95b4fe..3d7c517962 100644 --- a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java +++ b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/serialize/KafkaMetricsDataDeserializer.java @@ -41,12 +41,24 @@ public class KafkaMetricsDataDeserializer implements Deserializer<CollectRep.Met @Override public CollectRep.MetricsData deserialize(String s, byte[] bytes){ - try (ByteArrayInputStream in = new ByteArrayInputStream(bytes); - ArrowStreamReader reader = new ArrowStreamReader(Channels.newChannel(in), new RootAllocator())) { + RootAllocator allocator = new RootAllocator(); + ByteArrayInputStream in = new ByteArrayInputStream(bytes); + ArrowStreamReader reader = new ArrowStreamReader(Channels.newChannel(in), allocator); + try { VectorSchemaRoot root = reader.getVectorSchemaRoot(); reader.loadNextBatch(); - return new CollectRep.MetricsData(new ArrowTable(root)); + // The MetricsData takes ownership of the allocator and will close it when needed. + // The reader's lifecycle is tied to the root - closing the allocator will + // reclaim all memory including the reader's buffers. + return new CollectRep.MetricsData(new ArrowTable(root), allocator); } catch (IOException e) { + // On error, close the allocator to release all resources + try { + reader.close(); + } catch (IOException closeEx) { + // Ignore close exception + } + allocator.close(); throw new RuntimeException("Failed to deserialize Arrow table", e); } } diff --git a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java index 54b5143187..9089a8b918 100644 --- a/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java +++ b/hertzbeat-common-core/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java @@ -18,6 +18,7 @@ package org.apache.hertzbeat.common.util; import lombok.extern.slf4j.Slf4j; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; @@ -88,12 +89,11 @@ public final class ArrowUtil { * @return List of deserialized VectorSchemaRoot objects * @throws RuntimeException if deserialization fails */ - public static List<VectorSchemaRoot> deserializeMultipleRoots(byte[] data) { + public static List<VectorSchemaRoot> deserializeMultipleRoots(byte[] data, BufferAllocator allocator) { List<VectorSchemaRoot> roots = new ArrayList<>(); ByteBuffer buffer = ByteBuffer.wrap(data); try { int rootCount = buffer.getInt(); - RootAllocator allocator = new RootAllocator(); for (int i = 0; i < rootCount; i++) { int length = buffer.getInt(); @@ -102,6 +102,8 @@ public final class ArrowUtil { ByteArrayInputStream rootIn = new ByteArrayInputStream(data, buffer.position(), length); buffer.position(buffer.position() + length); + // Note: reader lifecycle is tied to the returned VectorSchemaRoot — + // closing the reader would invalidate the root's vectors. ArrowStreamReader reader = new ArrowStreamReader( Channels.newChannel(rootIn), allocator); @@ -131,17 +133,34 @@ public final class ArrowUtil { if (data == null || data.length == 0) { return new ArrayList<>(); } - List<VectorSchemaRoot> roots = deserializeMultipleRoots(data); - List<CollectRep.MetricsData> metricsDataList = new ArrayList<>(roots.size()); + List<CollectRep.MetricsData> metricsDataList = new ArrayList<>(); + ByteBuffer buffer = ByteBuffer.wrap(data); try { - for (VectorSchemaRoot root : roots) { - if (root != null) { - CollectRep.MetricsData metricsData = new CollectRep.MetricsData(root); - metricsDataList.add(metricsData); + int rootCount = buffer.getInt(); + for (int i = 0; i < rootCount; i++) { + int length = buffer.getInt(); + ByteArrayInputStream rootIn = new ByteArrayInputStream(data, buffer.position(), length); + buffer.position(buffer.position() + length); + // Each MetricsData gets its own allocator so they can be closed independently. + // The reader is intentionally not closed here — its lifecycle is tied to the root, + // and closing it would invalidate the root's vectors. The allocator.close() in + // MetricsData.close() will reclaim all memory allocated by both the reader and root. + RootAllocator allocator = new RootAllocator(); + ArrowStreamReader reader = new ArrowStreamReader(Channels.newChannel(rootIn), allocator); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + reader.loadNextBatch(); + metricsDataList.add(new CollectRep.MetricsData(root, allocator)); + } + } catch (IOException e) { + // Clean up any MetricsData objects created before the failure to prevent memory leaks + for (CollectRep.MetricsData metricsData : metricsDataList) { + try { + metricsData.close(); + } catch (Exception closeEx) { + log.warn("Failed to close MetricsData during error cleanup: {}", closeEx.getMessage()); } } - } finally { - roots.forEach(VectorSchemaRoot::close); + throw new RuntimeException("Failed to deserialize metrics data", e); } return metricsDataList; } diff --git a/hertzbeat-common-core/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java b/hertzbeat-common-core/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java index beb0d2cd9b..d74fe2b29a 100644 --- a/hertzbeat-common-core/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java +++ b/hertzbeat-common-core/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java @@ -67,8 +67,9 @@ class ArrowUtilTest { // Serialize byte[] data = ArrowUtil.serializeMultipleRoots(roots); - // Deserialize - List<VectorSchemaRoot> deserializedRoots = ArrowUtil.deserializeMultipleRoots(data); + // Deserialize — pass a fresh allocator for deserialization + RootAllocator deserializeAllocator = new RootAllocator(); + List<VectorSchemaRoot> deserializedRoots = ArrowUtil.deserializeMultipleRoots(data, deserializeAllocator); Assertions.assertEquals(2, deserializedRoots.size()); VectorSchemaRoot resultRoot1 = deserializedRoots.get(0); @@ -81,6 +82,7 @@ class ArrowUtilTest { // Cleanup roots.forEach(VectorSchemaRoot::close); deserializedRoots.forEach(VectorSchemaRoot::close); + deserializeAllocator.close(); allocator.close(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
