This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new dc51ce7bef [fix]Fixed position offset issue during deserialization of
ArrowUtil (#3897)
dc51ce7bef is described below
commit dc51ce7bef7170d695be22c13f0a92b787fdc75c
Author: Duansg <[email protected]>
AuthorDate: Tue Dec 9 21:46:36 2025 +0800
[fix]Fixed position offset issue during deserialization of ArrowUtil (#3897)
Co-authored-by: Tomsun28 <[email protected]>
---
.../apache/hertzbeat/common/util/ArrowUtil.java | 72 +++++++-----
.../hertzbeat/common/util/ArrowUtilTest.java | 123 +++++++++++++++++++++
2 files changed, 166 insertions(+), 29 deletions(-)
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
index f2a04b6af9..54b5143187 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/util/ArrowUtil.java
@@ -17,27 +17,28 @@
package org.apache.hertzbeat.common.util;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.apache.hertzbeat.common.entity.message.CollectRep;
/**
* Arrow data serialization and deserialization utility class
*/
@Slf4j
public final class ArrowUtil {
-
+
private ArrowUtil() {
}
@@ -53,18 +54,22 @@ public final class ArrowUtil {
public static byte[] serializeMultipleRoots(List<VectorSchemaRoot> roots) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out)) {
-
+
dataOut.writeInt(roots.size());
for (VectorSchemaRoot root : roots) {
- ArrowStreamWriter writer = new ArrowStreamWriter(
- root,
- null,
- Channels.newChannel(out));
- writer.start();
- writer.writeBatch();
- writer.end();
- writer.close();
- root.close();
+ // Use a temporary stream to obtain the precise byte length of
a single root,
+ // write the length, and resolve the pre-read issue.
+ try (ByteArrayOutputStream tempOut = new
ByteArrayOutputStream()) {
+ try (ArrowStreamWriter writer = new
ArrowStreamWriter(root, null, Channels.newChannel(tempOut))) {
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+ }
+ int size = tempOut.size();
+ dataOut.writeInt(size);
+ dataOut.flush();
+ tempOut.writeTo(out);
+ }
}
return out.toByteArray();
} catch (IOException e) {
@@ -85,15 +90,20 @@ public final class ArrowUtil {
*/
public static List<VectorSchemaRoot> deserializeMultipleRoots(byte[] data)
{
List<VectorSchemaRoot> roots = new ArrayList<>();
- try (ByteArrayInputStream in = new ByteArrayInputStream(data);
- DataInputStream dataIn = new DataInputStream(in)) {
-
- int rootCount = dataIn.readInt();
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ try {
+ int rootCount = buffer.getInt();
RootAllocator allocator = new RootAllocator();
-
+
for (int i = 0; i < rootCount; i++) {
+ int length = buffer.getInt();
+
+ // Split the InputStream to prevent the Reader from reading
beyond its bounds.
+ ByteArrayInputStream rootIn = new ByteArrayInputStream(data,
buffer.position(), length);
+ buffer.position(buffer.position() + length);
+
ArrowStreamReader reader = new ArrowStreamReader(
- Channels.newChannel(in),
+ Channels.newChannel(rootIn),
allocator);
VectorSchemaRoot root = reader.getVectorSchemaRoot();
reader.loadNextBatch();
@@ -147,11 +157,15 @@ public final class ArrowUtil {
*/
public static byte[] serializeMetricsData(List<CollectRep.MetricsData>
metricsDataList) {
List<VectorSchemaRoot> roots = new ArrayList<>(metricsDataList.size());
- for (CollectRep.MetricsData metricsData : metricsDataList) {
- VectorSchemaRoot root = metricsData.toVectorSchemaRootAndRelease();
- roots.add(root);
+ try {
+ for (CollectRep.MetricsData metricsData : metricsDataList) {
+ VectorSchemaRoot root =
metricsData.toVectorSchemaRootAndRelease();
+ roots.add(root);
+ }
+ return serializeMultipleRoots(roots);
+ } finally {
+ roots.forEach(VectorSchemaRoot::close);
}
- return serializeMultipleRoots(roots);
}
-
+
}
diff --git
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
new file mode 100644
index 0000000000..beb0d2cd9b
--- /dev/null
+++
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/util/ArrowUtilTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.util;
+
+import com.google.common.collect.Lists;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test case for {@link ArrowUtil}
+ */
+class ArrowUtilTest {
+
+ @Test
+ void testSerializeAndDeserializeMultipleRoots() {
+ RootAllocator allocator = new RootAllocator();
+ List<VectorSchemaRoot> roots = new ArrayList<>();
+
+ // Create first root
+ Field field1 = new Field("field1", FieldType.nullable(new
ArrowType.Int(64, true)), null);
+ Schema schema1 = new Schema(Collections.singletonList(field1));
+ VectorSchemaRoot root1 = VectorSchemaRoot.create(schema1, allocator);
+ BigIntVector vector1 = (BigIntVector) root1.getVector("field1");
+ vector1.allocateNew(10);
+ vector1.setSafe(0, 100L);
+ vector1.setValueCount(1);
+ root1.setRowCount(1);
+ roots.add(root1);
+
+ // Create second root
+ Field field2 = new Field("field2", FieldType.nullable(new
ArrowType.Int(64, true)), null);
+ Schema schema2 = new Schema(Collections.singletonList(field2));
+ VectorSchemaRoot root2 = VectorSchemaRoot.create(schema2, allocator);
+ BigIntVector vector2 = (BigIntVector) root2.getVector("field2");
+ vector2.allocateNew(10);
+ vector2.setSafe(0, 200L);
+ vector2.setValueCount(1);
+ root2.setRowCount(1);
+ roots.add(root2);
+
+ // Serialize
+ byte[] data = ArrowUtil.serializeMultipleRoots(roots);
+ // Deserialize
+ List<VectorSchemaRoot> deserializedRoots =
ArrowUtil.deserializeMultipleRoots(data);
+
+ Assertions.assertEquals(2, deserializedRoots.size());
+ VectorSchemaRoot resultRoot1 = deserializedRoots.get(0);
+ Assertions.assertEquals(1, resultRoot1.getRowCount());
+ Assertions.assertEquals(100L, ((BigIntVector)
resultRoot1.getVector("field1")).get(0));
+ VectorSchemaRoot resultRoot2 = deserializedRoots.get(1);
+ Assertions.assertEquals(1, resultRoot2.getRowCount());
+ Assertions.assertEquals(200L, ((BigIntVector)
resultRoot2.getVector("field2")).get(0));
+
+ // Cleanup
+ roots.forEach(VectorSchemaRoot::close);
+ deserializedRoots.forEach(VectorSchemaRoot::close);
+ allocator.close();
+ }
+
+ @Test
+ void testSerializeAndDeserializeMetricsData() {
+ CollectRep.MetricsData metricsData =
CollectRep.MetricsData.newBuilder()
+ .setId(1L)
+ .setApp("linux")
+ .setMetrics("cpu")
+ .setTime(System.currentTimeMillis())
+
.addField(CollectRep.Field.newBuilder().setName("usage").setType(1).build())
+
.addValueRow(CollectRep.ValueRow.newBuilder().addColumn("50.5").build())
+ .build();
+ CollectRep.MetricsData metricsData1 =
CollectRep.MetricsData.newBuilder()
+ .setId(1L)
+ .setApp("linux_1")
+ .setMetrics("cpu")
+ .setTime(System.currentTimeMillis())
+
.addField(CollectRep.Field.newBuilder().setName("usage").setType(1).build())
+
.addValueRow(CollectRep.ValueRow.newBuilder().addColumn("60.5").build())
+ .build();
+ List<CollectRep.MetricsData> list = Lists.newArrayList(metricsData,
metricsData1);
+
+ // Serialize
+ byte[] data = ArrowUtil.serializeMetricsData(list);
+ // Deserialize
+ List<CollectRep.MetricsData> deserializedList =
ArrowUtil.deserializeMetricsData(data);
+
+ Assertions.assertEquals(2, deserializedList.size());
+ CollectRep.MetricsData result = deserializedList.get(0);
+ Assertions.assertEquals("linux", result.getApp());
+ Assertions.assertEquals(1, result.getValues().size());
+ Assertions.assertEquals("50.5",
result.getValues().get(0).getColumns(0));
+
+ result = deserializedList.get(1);
+ Assertions.assertEquals("linux_1", result.getApp());
+ Assertions.assertEquals(1, result.getValues().size());
+ Assertions.assertEquals("60.5",
result.getValues().get(0).getColumns(0));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]