This is an automated email from the ASF dual-hosted git repository.

gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new dc51ce7bef [fix]Fixed position offset issue during deserialization of 
ArrowUtil (#3897)
dc51ce7bef is described below

commit dc51ce7bef7170d695be22c13f0a92b787fdc75c
Author: Duansg <[email protected]>
AuthorDate: Tue Dec 9 21:46:36 2025 +0800

    [fix]Fixed position offset issue during deserialization of ArrowUtil (#3897)
    
    Co-authored-by: Tomsun28 <[email protected]>
---
 .../apache/hertzbeat/common/util/ArrowUtil.java    |  72 +++++++-----
 .../hertzbeat/common/util/ArrowUtilTest.java       | 123 +++++++++++++++++++++
 2 files changed, 166 insertions(+), 29 deletions(-)

diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
index f2a04b6af9..54b5143187 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
@@ -17,27 +17,28 @@
 
 package org.apache.hertzbeat.common.util;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.util.ArrayList;
 import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.apache.hertzbeat.common.entity.message.CollectRep;
 
 /**
  * Arrow data serialization and deserialization utility class
  */
 @Slf4j
 public final class ArrowUtil {
-    
+
     private ArrowUtil() {
     }
 
@@ -53,18 +54,22 @@ public final class ArrowUtil {
     public static byte[] serializeMultipleRoots(List<VectorSchemaRoot> roots) {
         try (ByteArrayOutputStream out = new ByteArrayOutputStream();
              DataOutputStream dataOut = new DataOutputStream(out)) {
-            
+
             dataOut.writeInt(roots.size());
             for (VectorSchemaRoot root : roots) {
-                ArrowStreamWriter writer = new ArrowStreamWriter(
-                        root,
-                        null,
-                        Channels.newChannel(out));
-                writer.start();
-                writer.writeBatch();
-                writer.end();
-                writer.close();
-                root.close();
+                // Use a temporary stream to obtain the precise byte length of 
a single root,
+                // write the length, and resolve the pre-read issue.
+                try (ByteArrayOutputStream tempOut = new 
ByteArrayOutputStream()) {
+                    try (ArrowStreamWriter writer = new 
ArrowStreamWriter(root, null, Channels.newChannel(tempOut))) {
+                        writer.start();
+                        writer.writeBatch();
+                        writer.end();
+                    }
+                    int size = tempOut.size();
+                    dataOut.writeInt(size);
+                    dataOut.flush();
+                    tempOut.writeTo(out);
+                }
             }
             return out.toByteArray();
         } catch (IOException e) {
@@ -85,15 +90,20 @@ public final class ArrowUtil {
      */
     public static List<VectorSchemaRoot> deserializeMultipleRoots(byte[] data) 
{
         List<VectorSchemaRoot> roots = new ArrayList<>();
-        try (ByteArrayInputStream in = new ByteArrayInputStream(data);
-             DataInputStream dataIn = new DataInputStream(in)) {
-            
-            int rootCount = dataIn.readInt();
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        try {
+            int rootCount = buffer.getInt();
             RootAllocator allocator = new RootAllocator();
-            
+
             for (int i = 0; i < rootCount; i++) {
+                int length = buffer.getInt();
+
+                // Split the InputStream to prevent the Reader from reading 
beyond its bounds.
+                ByteArrayInputStream rootIn = new ByteArrayInputStream(data, 
buffer.position(), length);
+                buffer.position(buffer.position() + length);
+
                 ArrowStreamReader reader = new ArrowStreamReader(
-                        Channels.newChannel(in),
+                        Channels.newChannel(rootIn),
                         allocator);
                 VectorSchemaRoot root = reader.getVectorSchemaRoot();
                 reader.loadNextBatch();
@@ -147,11 +157,15 @@ public final class ArrowUtil {
      */
     public static byte[] serializeMetricsData(List<CollectRep.MetricsData> 
metricsDataList) {
         List<VectorSchemaRoot> roots = new ArrayList<>(metricsDataList.size());
-        for (CollectRep.MetricsData metricsData : metricsDataList) {
-            VectorSchemaRoot root = metricsData.toVectorSchemaRootAndRelease();
-            roots.add(root);
+        try {
+            for (CollectRep.MetricsData metricsData : metricsDataList) {
+                VectorSchemaRoot root = 
metricsData.toVectorSchemaRootAndRelease();
+                roots.add(root);
+            }
+            return serializeMultipleRoots(roots);
+        } finally {
+            roots.forEach(VectorSchemaRoot::close);
         }
-        return serializeMultipleRoots(roots);
     }
-    
+
 }
diff --git 
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
new file mode 100644
index 0000000000..beb0d2cd9b
--- /dev/null
+++ 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+import com.google.common.collect.Lists;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test case for {@link ArrowUtil}
+ */
+class ArrowUtilTest {
+
+    @Test
+    void testSerializeAndDeserializeMultipleRoots() {
+        RootAllocator allocator = new RootAllocator();
+        List<VectorSchemaRoot> roots = new ArrayList<>();
+
+        // Create first root
+        Field field1 = new Field("field1", FieldType.nullable(new 
ArrowType.Int(64, true)), null);
+        Schema schema1 = new Schema(Collections.singletonList(field1));
+        VectorSchemaRoot root1 = VectorSchemaRoot.create(schema1, allocator);
+        BigIntVector vector1 = (BigIntVector) root1.getVector("field1");
+        vector1.allocateNew(10);
+        vector1.setSafe(0, 100L);
+        vector1.setValueCount(1);
+        root1.setRowCount(1);
+        roots.add(root1);
+
+        // Create second root
+        Field field2 = new Field("field2", FieldType.nullable(new 
ArrowType.Int(64, true)), null);
+        Schema schema2 = new Schema(Collections.singletonList(field2));
+        VectorSchemaRoot root2 = VectorSchemaRoot.create(schema2, allocator);
+        BigIntVector vector2 = (BigIntVector) root2.getVector("field2");
+        vector2.allocateNew(10);
+        vector2.setSafe(0, 200L);
+        vector2.setValueCount(1);
+        root2.setRowCount(1);
+        roots.add(root2);
+
+        // Serialize
+        byte[] data = ArrowUtil.serializeMultipleRoots(roots);
+        // Deserialize
+        List<VectorSchemaRoot> deserializedRoots = 
ArrowUtil.deserializeMultipleRoots(data);
+
+        Assertions.assertEquals(2, deserializedRoots.size());
+        VectorSchemaRoot resultRoot1 = deserializedRoots.get(0);
+        Assertions.assertEquals(1, resultRoot1.getRowCount());
+        Assertions.assertEquals(100L, ((BigIntVector) 
resultRoot1.getVector("field1")).get(0));
+        VectorSchemaRoot resultRoot2 = deserializedRoots.get(1);
+        Assertions.assertEquals(1, resultRoot2.getRowCount());
+        Assertions.assertEquals(200L, ((BigIntVector) 
resultRoot2.getVector("field2")).get(0));
+
+        // Cleanup
+        roots.forEach(VectorSchemaRoot::close);
+        deserializedRoots.forEach(VectorSchemaRoot::close);
+        allocator.close();
+    }
+
+    @Test
+    void testSerializeAndDeserializeMetricsData() {
+        CollectRep.MetricsData metricsData = 
CollectRep.MetricsData.newBuilder()
+            .setId(1L)
+            .setApp("linux")
+            .setMetrics("cpu")
+            .setTime(System.currentTimeMillis())
+            
.addField(CollectRep.Field.newBuilder().setName("usage").setType(1).build())
+            
.addValueRow(CollectRep.ValueRow.newBuilder().addColumn("50.5").build())
+            .build();
+        CollectRep.MetricsData metricsData1 = 
CollectRep.MetricsData.newBuilder()
+            .setId(1L)
+            .setApp("linux_1")
+            .setMetrics("cpu")
+            .setTime(System.currentTimeMillis())
+            
.addField(CollectRep.Field.newBuilder().setName("usage").setType(1).build())
+            
.addValueRow(CollectRep.ValueRow.newBuilder().addColumn("60.5").build())
+            .build();
+        List<CollectRep.MetricsData> list = Lists.newArrayList(metricsData, 
metricsData1);
+
+        // Serialize
+        byte[] data = ArrowUtil.serializeMetricsData(list);
+        // Deserialize
+        List<CollectRep.MetricsData> deserializedList = 
ArrowUtil.deserializeMetricsData(data);
+
+        Assertions.assertEquals(2, deserializedList.size());
+        CollectRep.MetricsData result = deserializedList.get(0);
+        Assertions.assertEquals("linux", result.getApp());
+        Assertions.assertEquals(1, result.getValues().size());
+        Assertions.assertEquals("50.5", 
result.getValues().get(0).getColumns(0));
+
+        result = deserializedList.get(1);
+        Assertions.assertEquals("linux_1", result.getApp());
+        Assertions.assertEquals(1, result.getValues().size());
+        Assertions.assertEquals("60.5", 
result.getValues().get(0).getColumns(0));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to