This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 9a96d1577fc567f872aacd4bd86c40d27b87c50c
Author: Megrez Lu <[email protected]>
AuthorDate: Sun May 1 22:07:20 2022 +0800

    add metadata integration
---
 .../banyandb/{stream => }/BanyanDBBatchDAO.java    |  36 ++-
 .../storage/plugin/banyandb/BanyanDBConverter.java |  95 ++++--
 .../plugin/banyandb/BanyanDBIndexInstaller.java    |   6 +-
 .../plugin/banyandb/BanyanDBStorageClient.java     |   7 +-
 .../plugin/banyandb/BanyanDBStorageProvider.java   |   1 -
 .../storage/plugin/banyandb/MetadataRegistry.java  |   6 +-
 .../BanyanDBMeasureInsertRequest.java}             |  23 +-
 .../banyandb/measure/BanyanDBMetadataQueryDAO.java | 325 ++++++++++++++++++++-
 .../banyandb/{ => measure}/BanyanDBMetricsDAO.java |  11 +-
 .../banyandb/stream/BanyanDBAlarmQueryDAO.java     |   2 +-
 .../BanyanDBProfileThreadSnapshotQueryDAO.java     |   4 +-
 .../plugin/banyandb/stream/BanyanDBRecordDAO.java  |   2 +-
 .../plugin/banyandb/stream/BanyanDBStorageDAO.java |   2 +-
 .../banyandb/stream/BanyanDBTraceQueryDAO.java     |   2 +-
 .../storage/plugin/banyandb/util/ByteUtil.java     |  19 +-
 .../plugin/banyandb/util/ByteUtilTest.java}        |  27 +-
 16 files changed, 485 insertions(+), 83 deletions(-)

diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
similarity index 61%
rename from 
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
rename to 
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
index e17b8b66ac..bb776c6dfe 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
@@ -16,25 +16,35 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
+import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
-import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureInsertRequest;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> 
implements IBatchDAO {
-    private StreamBulkWriteProcessor bulkProcessor;
+    private StreamBulkWriteProcessor streamBulkWriteProcessor;
+
+    private MeasureBulkWriteProcessor measureBulkWriteProcessor;
+
     private final int maxBulkSize;
+
     private final int flushInterval;
+
     private final int concurrency;
 
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+
     public BanyanDBBatchDAO(BanyanDBStorageClient client, int maxBulkSize, int 
flushInterval, int concurrency) {
         super(client);
         this.maxBulkSize = maxBulkSize;
@@ -44,27 +54,31 @@ public class BanyanDBBatchDAO extends 
AbstractDAO<BanyanDBStorageClient> impleme
 
     @Override
     public void insert(InsertRequest insertRequest) {
-        if (bulkProcessor == null) {
-            this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, 
flushInterval, concurrency);
+        if (initialized.compareAndSet(false, true)) {
+            this.streamBulkWriteProcessor = 
getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
+            this.measureBulkWriteProcessor = 
getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
         }
-
         if (insertRequest instanceof BanyanDBStreamInsertRequest) {
-            this.bulkProcessor.add(((BanyanDBStreamInsertRequest) 
insertRequest).getStreamWrite());
+            this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) 
insertRequest).getStreamWrite());
+        } else if (insertRequest instanceof BanyanDBMeasureInsertRequest) {
+            this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) 
insertRequest).getMeasureWrite());
         }
     }
 
     @Override
     public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) 
{
-        if (bulkProcessor == null) {
-            this.bulkProcessor = getClient().createBulkProcessor(maxBulkSize, 
flushInterval, concurrency);
+        if (initialized.compareAndSet(false, true)) {
+            this.streamBulkWriteProcessor = 
getClient().createStreamBulkProcessor(maxBulkSize, flushInterval, concurrency);
+            this.measureBulkWriteProcessor = 
getClient().createMeasureBulkProcessor(maxBulkSize, flushInterval, concurrency);
         }
 
         if (CollectionUtils.isNotEmpty(prepareRequests)) {
             return 
CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
                 if (prepareRequest instanceof BanyanDBStreamInsertRequest) {
                     // TODO: return CompletableFuture<Void>
-                    this.bulkProcessor.add(((BanyanDBStreamInsertRequest) 
prepareRequest).getStreamWrite());
-                } else {
+                    
this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) 
prepareRequest).getStreamWrite());
+                } else if (prepareRequest instanceof 
BanyanDBMeasureInsertRequest) {
+                    
this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) 
prepareRequest).getMeasureWrite());
                 }
                 return CompletableFuture.completedFuture(null);
             }).toArray(CompletableFuture[]::new));
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index c3c936ef84..fe4d15f9ea 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -18,15 +18,18 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
+import com.google.gson.JsonObject;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.v1.client.DataPoint;
 import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
 import org.apache.skywalking.banyandb.v1.client.RowEntity;
 import org.apache.skywalking.banyandb.v1.client.StreamWrite;
 import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
 import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import 
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
@@ -37,11 +40,15 @@ import java.util.List;
 import java.util.function.Function;
 
 public class BanyanDBConverter {
-    @RequiredArgsConstructor
-    public static class StreamToEntity implements Convert2Entity {
+    public static class StorageToStream implements Convert2Entity {
         private final MetadataRegistry.Schema schema;
         private final RowEntity rowEntity;
 
+        public StorageToStream(String modelName, RowEntity rowEntity) {
+            this.schema = MetadataRegistry.INSTANCE.findMetadata(modelName);
+            this.rowEntity = rowEntity;
+        }
+
         @Override
         public Object get(String fieldName) {
             MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName);
@@ -61,13 +68,17 @@ public class BanyanDBConverter {
     @Slf4j
     @RequiredArgsConstructor
     public static class StreamToStorage implements 
Convert2Storage<StreamWrite> {
+        private final MetadataRegistry.Schema schema;
         private final StreamWrite streamWrite;
 
         @Override
         public void accept(String fieldName, Object fieldValue) {
-            // TODO: skip "time_bucket"
+            MetadataRegistry.ColumnSpec columnSpec = 
this.schema.getSpec(fieldName);
+            if (columnSpec == null) {
+                throw new IllegalArgumentException("fail to find field[" + 
fieldName + "]");
+            }
             try {
-                this.streamWrite.tag(fieldName, buildTag(fieldValue));
+                this.streamWrite.tag(fieldName, buildTag(fieldValue, 
columnSpec.getColumnClass()));
             } catch (BanyanDBException ex) {
                 log.error("fail to add tag", ex);
             }
@@ -118,17 +129,28 @@ public class BanyanDBConverter {
         @Override
         public void accept(String fieldName, Object fieldValue) {
             MetadataRegistry.ColumnSpec columnSpec = 
this.schema.getSpec(fieldName);
+            if (columnSpec == null) {
+                throw new IllegalArgumentException("fail to find field[" + 
fieldName + "]");
+            }
             try {
                 if (columnSpec.getColumnType() == 
MetadataRegistry.ColumnType.TAG) {
-                    this.measureWrite.tag(fieldName, buildTag(fieldValue));
+                    this.measureWrite.tag(fieldName, buildTag(fieldValue, 
columnSpec.getColumnClass()));
                 } else {
-                    this.measureWrite.field(fieldName, buildField(fieldValue));
+                    this.measureWrite.field(fieldName, buildField(fieldValue, 
columnSpec.getColumnClass()));
                 }
             } catch (BanyanDBException ex) {
                 log.error("fail to add tag", ex);
             }
         }
 
+        public void acceptID(String id) {
+            try {
+                this.measureWrite.tag(MetadataRegistry.ID, 
TagAndValue.idTagValue(id));
+            } catch (BanyanDBException ex) {
+                log.error("fail to add ID tag", ex);
+            }
+        }
+
         @Override
         public void accept(String fieldName, byte[] fieldValue) {
             MetadataRegistry.ColumnSpec columnSpec = 
this.schema.getSpec(fieldName);
@@ -170,29 +192,62 @@ public class BanyanDBConverter {
         }
     }
 
-    private static Serializable<BanyandbModel.TagValue> buildTag(Object value) 
{
-        if (Integer.class.equals(value.getClass()) || 
Long.class.equals(value.getClass())) {
-            return TagAndValue.longTagValue((long) value);
-        } else if (String.class.equals(value.getClass())) {
+    private static Serializable<BanyandbModel.TagValue> buildTag(Object value, 
final Class<?> clazz) {
+        if (Integer.class.equals(clazz) || int.class.equals(clazz)) {
+            return TagAndValue.longTagValue(((Number) value).longValue());
+        } else if (Long.class.equals(clazz) || long.class.equals(clazz)) {
+            return TagAndValue.longTagValue((Long) value);
+        } else if (String.class.equals(clazz)) {
             return TagAndValue.stringTagValue((String) value);
-        } else if (Double.class.equals(value.getClass())) {
+        } else if (Double.class.equals(clazz) || double.class.equals(clazz)) {
             return TagAndValue.binaryTagValue(ByteUtil.double2Bytes((double) 
value));
-        } else if (value instanceof StorageDataComplexObject) {
+        } else if (StorageDataComplexObject.class.isAssignableFrom(clazz)) {
             return TagAndValue.stringTagValue(((StorageDataComplexObject<?>) 
value).toStorageData());
+        } else if (Layer.class.equals(clazz)) {
+            return TagAndValue.longTagValue(((Integer) value).longValue());
+        } else if (JsonObject.class.equals(clazz)) {
+            return TagAndValue.stringTagValue((String) value);
         }
-        throw new IllegalStateException(value.getClass() + " is not 
supported");
+        throw new IllegalStateException(clazz.getSimpleName() + " is not 
supported");
     }
 
-    private static Serializable<BanyandbModel.FieldValue> buildField(Object 
value) {
-        if (Integer.class.equals(value.getClass()) || 
Long.class.equals(value.getClass())) {
-            return TagAndValue.longFieldValue((long) value);
-        } else if (String.class.equals(value.getClass())) {
+    private static Serializable<BanyandbModel.FieldValue> buildField(Object 
value, final Class<?> clazz) {
+        if (Integer.class.equals(clazz) || int.class.equals(clazz)) {
+            return TagAndValue.longFieldValue(((Number) value).longValue());
+        } else if (Long.class.equals(clazz) || long.class.equals(clazz)) {
+            return TagAndValue.longFieldValue((Long) value);
+        } else if (String.class.equals(clazz)) {
             return TagAndValue.stringFieldValue((String) value);
-        } else if (Double.class.equals(value.getClass())) {
+        } else if (Double.class.equals(clazz) || double.class.equals(clazz)) {
             return TagAndValue.binaryFieldValue(ByteUtil.double2Bytes((double) 
value));
-        } else if (value instanceof StorageDataComplexObject) {
+        } else if (StorageDataComplexObject.class.isAssignableFrom(clazz)) {
             return TagAndValue.stringFieldValue(((StorageDataComplexObject<?>) 
value).toStorageData());
         }
-        throw new IllegalStateException(value.getClass() + " is not 
supported");
+        throw new IllegalStateException(clazz.getSimpleName() + " is not 
supported");
+    }
+
+    public static class StorageToMeasure implements Convert2Entity {
+        private final MetadataRegistry.Schema schema;
+        private final DataPoint dataPoint;
+
+        public StorageToMeasure(String modelName, DataPoint dataPoint) {
+            this.schema = MetadataRegistry.INSTANCE.findMetadata(modelName);
+            this.dataPoint = dataPoint;
+        }
+
+        @Override
+        public Object get(String fieldName) {
+            MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName);
+            if (double.class.equals(spec.getColumnClass())) {
+                return ByteUtil.bytes2Double(dataPoint.getTagValue(fieldName));
+            } else {
+                return dataPoint.getTagValue(fieldName);
+            }
+        }
+
+        @Override
+        public <T, R> R getWith(String fieldName, Function<T, R> typeDecoder) {
+            return (R) this.get(fieldName);
+        }
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index 75109aa781..17d284c9a4 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -57,7 +57,11 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
             }
             log.info("group {} created", g.name());
             // then check entity schema
-            return metadata.findRemoteSchema(c).isPresent();
+            if (metadata.findRemoteSchema(c).isPresent()) {
+                MetadataRegistry.INSTANCE.registerModel(model, 
this.configService);
+                return true;
+            }
+            return false;
         } catch (BanyanDBException ex) {
             throw new StorageException("fail to check existence", ex);
         }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index a92c270023..9ba378125c 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -20,6 +20,7 @@ package 
org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import io.grpc.Status;
 import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
+import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
 import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
 import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
@@ -149,10 +150,14 @@ public class BanyanDBStorageClient implements Client, 
HealthCheckable {
         this.client.write(streamWrite);
     }
 
-    public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int 
flushInterval, int concurrency) {
+    public StreamBulkWriteProcessor createStreamBulkProcessor(int maxBulkSize, 
int flushInterval, int concurrency) {
         return this.client.buildStreamWriteProcessor(maxBulkSize, 
flushInterval, concurrency);
     }
 
+    public MeasureBulkWriteProcessor createMeasureBulkProcessor(int 
maxBulkSize, int flushInterval, int concurrency) {
+        return this.client.buildMeasureWriteProcessor(maxBulkSize, 
flushInterval, concurrency);
+    }
+
     @Override
     public void registerChecker(HealthChecker healthChecker) {
         this.healthChecker.register(healthChecker);
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 26c6b2529a..6bff41e16f 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -57,7 +57,6 @@ import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBServiceLabelDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO;
-import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBatchDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBrowserLogQueryDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingDataDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingScheduleQueryDAO;
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index f0b88fe538..18767ae21e 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -67,7 +67,7 @@ import java.util.stream.Collectors;
 public enum MetadataRegistry {
     INSTANCE;
 
-    private static final String ID = "id";
+    public static final String ID = "id";
     private final Map<String, Schema> registry = new ConcurrentHashMap<>();
 
     public NamedSchema<?> registerModel(Model model, ConfigService 
configService) {
@@ -91,8 +91,7 @@ public enum MetadataRegistry {
         if (partialMetadata.getKind() == Kind.STREAM) {
             final Stream.Builder builder = 
Stream.create(partialMetadata.getGroup(), partialMetadata.getName());
             if (entities.isEmpty()) {
-                log.warn("sharding keys of model[stream.{}] must not be 
empty", model.getName());
-//            throw new IllegalStateException("sharding keys of model[" + 
model.getName() + "] must not be empty");
+                throw new IllegalStateException("sharding keys of 
model[stream." + model.getName() + "] must not be empty");
             }
             builder.setEntityRelativeTags(entities);
             builder.addTagFamilies(tagFamilySpecs);
@@ -401,6 +400,7 @@ public enum MetadataRegistry {
         private final PartialMetadata metadata;
         @Singular
         private final Map<String, ColumnSpec> specs;
+        private final boolean useIdAsShardingKey;
 
         public ColumnSpec getSpec(String columnName) {
             return this.specs.get(columnName);
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
similarity index 67%
copy from 
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
copy to 
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
index 5ae1102e38..3f965bff6f 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureInsertRequest.java
@@ -16,18 +16,15 @@
  *
  */
 
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.util;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
-import java.nio.ByteBuffer;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 
-public class ByteUtil {
-    private static final ThreadLocal<ByteBuffer> BYTE_BUFFER = 
ThreadLocal.withInitial(() -> ByteBuffer.allocate(8));
-
-    public static Double bytes2Double(byte[] bytes) {
-        return BYTE_BUFFER.get().put(bytes).getDouble();
-    }
-
-    public static byte[] double2Bytes(double number) {
-        return BYTE_BUFFER.get().putDouble(number).array();
-    }
-}
+@RequiredArgsConstructor
+@Getter
+public class BanyanDBMeasureInsertRequest implements InsertRequest {
+    private final MeasureWrite measureWrite;
+}
\ No newline at end of file
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index 23828464da..23f096115b 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -18,10 +18,26 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.apache.skywalking.banyandb.v1.client.DataPoint;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
 import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
+import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
+import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.Layer;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType;
+import 
org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic;
+import org.apache.skywalking.oap.server.core.query.enumeration.Language;
+import 
org.apache.skywalking.oap.server.core.query.enumeration.ProfilingSupportStatus;
+import org.apache.skywalking.oap.server.core.query.type.Attribute;
 import org.apache.skywalking.oap.server.core.query.type.Endpoint;
 import org.apache.skywalking.oap.server.core.query.type.Process;
 import org.apache.skywalking.oap.server.core.query.type.Service;
@@ -29,13 +45,22 @@ import 
org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
 import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic.PropertyUtil.LANGUAGE;
 
 public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements 
IMetadataQueryDAO {
+    private static final Gson GSON = new Gson();
+
     public BanyanDBMetadataQueryDAO(BanyanDBStorageClient client) {
         super(client);
     }
@@ -43,7 +68,12 @@ public class BanyanDBMetadataQueryDAO extends 
AbstractBanyanDBDAO implements IMe
     @Override
     public List<Service> listServices(String layer, String group) throws 
IOException {
         MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
-                ImmutableSet.of(ServiceTraffic.NAME, 
ServiceTraffic.SHORT_NAME),
+                ImmutableSet.of(ServiceTraffic.NAME,
+                        ServiceTraffic.SHORT_NAME,
+                        ServiceTraffic.GROUP,
+                        ServiceTraffic.LAYER,
+                        ServiceTraffic.SERVICE_ID,
+                        MetadataRegistry.ID),
                 Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
                     @Override
                     protected void apply(MeasureQuery query) {
@@ -56,41 +86,314 @@ public class BanyanDBMetadataQueryDAO extends 
AbstractBanyanDBDAO implements IMe
                     }
                 });
 
-        return Collections.emptyList();
+        final List<Service> services = new ArrayList<>();
+
+        for (final DataPoint dataPoint : resp.getDataPoints()) {
+            services.add(buildService(dataPoint));
+        }
+
+        return services;
     }
 
     @Override
     public List<Service> getServices(String serviceId) throws IOException {
-        return Collections.emptyList();
+        MeasureQueryResponse resp = query(ServiceTraffic.INDEX_NAME,
+                ImmutableSet.of(ServiceTraffic.NAME,
+                        ServiceTraffic.SHORT_NAME,
+                        ServiceTraffic.GROUP,
+                        ServiceTraffic.LAYER,
+                        ServiceTraffic.SERVICE_ID,
+                        MetadataRegistry.ID),
+                Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        if (StringUtil.isNotEmpty(serviceId)) {
+                            
query.appendCondition(eq(ServiceTraffic.SERVICE_ID, serviceId));
+                        }
+                    }
+                });
+
+        final List<Service> services = new ArrayList<>();
+
+        for (final DataPoint dataPoint : resp.getDataPoints()) {
+            services.add(buildService(dataPoint));
+        }
+
+        return services;
     }
 
     @Override
     public List<ServiceInstance> listInstances(long startTimestamp, long 
endTimestamp, String serviceId) throws IOException {
-        return Collections.emptyList();
+        final long minuteTimeBucket = 
TimeBucket.getMinuteTimeBucket(startTimestamp);
+
+        MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
+                ImmutableSet.of(InstanceTraffic.NAME,
+                        InstanceTraffic.LAYER,
+                        InstanceTraffic.PROPERTIES,
+                        InstanceTraffic.LAST_PING_TIME_BUCKET,
+                        InstanceTraffic.SERVICE_ID,
+                        MetadataRegistry.ID),
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        if (StringUtil.isNotEmpty(serviceId)) {
+                            
query.appendCondition(eq(InstanceTraffic.SERVICE_ID, serviceId));
+                        }
+                        
query.appendCondition(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, 
minuteTimeBucket));
+                    }
+                });
+
+        final List<ServiceInstance> instances = new ArrayList<>();
+
+        for (final DataPoint dataPoint : resp.getDataPoints()) {
+            instances.add(buildInstance(dataPoint));
+        }
+
+        return instances;
     }
 
     @Override
     public ServiceInstance getInstance(String instanceId) throws IOException {
-        return null;
+        MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
+                ImmutableSet.of(InstanceTraffic.NAME,
+                        InstanceTraffic.LAYER,
+                        InstanceTraffic.PROPERTIES,
+                        MetadataRegistry.ID),
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        if (StringUtil.isNotEmpty(instanceId)) {
+                            
query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID,
 instanceId));
+                        }
+                    }
+                });
+
+        return resp.size() > 0 ? buildInstance(resp.getDataPoints().get(0)) : 
null;
     }
 
     @Override
     public List<Endpoint> findEndpoint(String keyword, String serviceId, int 
limit) throws IOException {
-        return Collections.emptyList();
+        MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
+                ImmutableSet.of(EndpointTraffic.NAME,
+                        EndpointTraffic.SERVICE_ID,
+                        MetadataRegistry.ID),
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        if (StringUtil.isNotEmpty(serviceId)) {
+                            
query.appendCondition(eq(EndpointTraffic.SERVICE_ID, serviceId));
+                        }
+                    }
+                });
+
+        final List<Endpoint> endpoints = new ArrayList<>();
+
+        for (final DataPoint dataPoint : resp.getDataPoints()) {
+            endpoints.add(buildEndpoint(dataPoint));
+        }
+
+        if (StringUtil.isNotEmpty(serviceId)) {
+            return endpoints.stream().filter(e -> 
e.getName().contains(keyword)).collect(Collectors.toList());
+        }
+        return endpoints;
     }
 
     @Override
-    public List<Process> listProcesses(String serviceId, String instanceId, 
String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) 
throws IOException {
-        return Collections.emptyList();
+    public List<Process> listProcesses(String serviceId, String instanceId, 
String agentId, ProfilingSupportStatus profilingSupportStatus, long 
lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
+        MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
+                ImmutableSet.of(ProcessTraffic.NAME,
+                        ProcessTraffic.SERVICE_ID,
+                        ProcessTraffic.INSTANCE_ID,
+                        ProcessTraffic.AGENT_ID,
+                        ProcessTraffic.LAYER,
+                        ProcessTraffic.DETECT_TYPE,
+                        ProcessTraffic.PROPERTIES,
+                        ProcessTraffic.LABELS_JSON,
+                        ProcessTraffic.LAST_PING_TIME_BUCKET,
+                        ProcessTraffic.PROFILING_SUPPORT_STATUS,
+                        MetadataRegistry.ID),
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        if (StringUtil.isNotEmpty(serviceId)) {
+                            
query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId));
+                        }
+                        if (StringUtil.isNotEmpty(instanceId)) {
+                            
query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId));
+                        }
+                        if (StringUtil.isNotEmpty(agentId)) {
+                            query.appendCondition(eq(ProcessTraffic.AGENT_ID, 
instanceId));
+                        }
+                        if (lastPingStartTimeBucket > 0) {
+                            
query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, 
lastPingStartTimeBucket));
+                        }
+                        if (lastPingEndTimeBucket > 0) {
+                            
query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, 
lastPingEndTimeBucket));
+                        }
+                        if (profilingSupportStatus != null) {
+                            
query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, 
profilingSupportStatus.value()));
+                        }
+                    }
+                });
+
+        final List<Process> processes = new ArrayList<>();
+
+        for (final DataPoint dataPoint : resp.getDataPoints()) {
+            processes.add(buildProcess(dataPoint));
+        }
+
+        return processes;
     }
 
     @Override
-    public long getProcessesCount(String serviceId, String instanceId, String 
agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws 
IOException {
-        return 0;
+    public long getProcessesCount(String serviceId, String instanceId, String 
agentId, ProfilingSupportStatus profilingSupportStatus, long 
lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException {
+        MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
+                ImmutableSet.of(ProcessTraffic.NAME,
+                        ProcessTraffic.SERVICE_ID,
+                        ProcessTraffic.INSTANCE_ID,
+                        ProcessTraffic.AGENT_ID,
+                        ProcessTraffic.LAYER,
+                        ProcessTraffic.DETECT_TYPE,
+                        ProcessTraffic.PROPERTIES,
+                        ProcessTraffic.LABELS_JSON,
+                        ProcessTraffic.LAST_PING_TIME_BUCKET,
+                        ProcessTraffic.PROFILING_SUPPORT_STATUS,
+                        MetadataRegistry.ID),
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        if (StringUtil.isNotEmpty(serviceId)) {
+                            
query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId));
+                        }
+                        if (StringUtil.isNotEmpty(instanceId)) {
+                            
query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId));
+                        }
+                        if (StringUtil.isNotEmpty(agentId)) {
+                            query.appendCondition(eq(ProcessTraffic.AGENT_ID, 
instanceId));
+                        }
+                        if (lastPingStartTimeBucket > 0) {
+                            
query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, 
lastPingStartTimeBucket));
+                        }
+                        if (lastPingEndTimeBucket > 0) {
+                            
query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, 
lastPingEndTimeBucket));
+                        }
+                        if (profilingSupportStatus != null) {
+                            
query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, 
profilingSupportStatus.value()));
+                        }
+                    }
+                });
+
+        return resp.getDataPoints()
+                .stream()
+                .collect(Collectors.groupingBy((Function<DataPoint, String>) 
dataPoint -> dataPoint.getTagValue(ProcessTraffic.PROPERTIES)))
+                .size();
     }
 
     @Override
     public Process getProcess(String processId) throws IOException {
-        return null;
+        MeasureQueryResponse resp = query(ProcessTraffic.INDEX_NAME,
+                ImmutableSet.of(ProcessTraffic.NAME,
+                        ProcessTraffic.SERVICE_ID,
+                        ProcessTraffic.INSTANCE_ID,
+                        ProcessTraffic.AGENT_ID,
+                        ProcessTraffic.LAYER,
+                        ProcessTraffic.DETECT_TYPE,
+                        ProcessTraffic.PROPERTIES,
+                        ProcessTraffic.LABELS_JSON,
+                        MetadataRegistry.ID),
+                Collections.emptySet(),
+                new QueryBuilder<MeasureQuery>() {
+                    @Override
+                    protected void apply(MeasureQuery query) {
+                        if (StringUtil.isNotEmpty(processId)) {
+                            
query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID,
 processId));
+                        }
+                    }
+                });
+
+        return resp.size() > 0 ? buildProcess(resp.getDataPoints().get(0)) : 
null;
+    }
+
+    private Service buildService(DataPoint dataPoint) {
+        Service service = new Service();
+        service.setId(dataPoint.getTagValue(ServiceTraffic.SERVICE_ID));
+        service.setName(dataPoint.getTagValue(ServiceTraffic.NAME));
+        service.setShortName(dataPoint.getTagValue(ServiceTraffic.SHORT_NAME));
+        service.setGroup(dataPoint.getTagValue(ServiceTraffic.GROUP));
+        service.getLayers().add(Layer.valueOf(((Number) 
dataPoint.getTagValue(ServiceTraffic.LAYER)).intValue()).name());
+        return service;
+    }
+
+    private ServiceInstance buildInstance(DataPoint dataPoint) {
+        ServiceInstance serviceInstance = new ServiceInstance();
+        serviceInstance.setId(dataPoint.getId());
+        serviceInstance.setName(dataPoint.getTagValue(InstanceTraffic.NAME));
+        serviceInstance.setInstanceUUID(dataPoint.getId());
+        serviceInstance.setLayer(Layer.valueOf(((Number) 
dataPoint.getTagValue(InstanceTraffic.LAYER)).intValue()).name());
+
+        final String propString = 
dataPoint.getTagValue(InstanceTraffic.PROPERTIES);
+        JsonObject properties = null;
+        if (StringUtil.isNotEmpty(propString)) {
+            properties = GSON.fromJson(propString, JsonObject.class);
+        }
+        if (properties != null) {
+            for (Map.Entry<String, JsonElement> property : 
properties.entrySet()) {
+                String key = property.getKey();
+                String value = property.getValue().getAsString();
+                if (key.equals(LANGUAGE)) {
+                    serviceInstance.setLanguage(Language.value(value));
+                } else {
+                    serviceInstance.getAttributes().add(new Attribute(key, 
value));
+                }
+            }
+        } else {
+            serviceInstance.setLanguage(Language.UNKNOWN);
+        }
+
+        return serviceInstance;
+    }
+
+    private Endpoint buildEndpoint(DataPoint dataPoint) {
+        Endpoint endpoint = new Endpoint();
+        endpoint.setId(dataPoint.getId());
+        endpoint.setName(dataPoint.getTagValue(EndpointTraffic.NAME));
+        return endpoint;
+    }
+
+    private Process buildProcess(DataPoint dataPoint) {
+        Process process = new Process();
+
+        process.setId(dataPoint.getId());
+        process.setName(dataPoint.getTagValue(ProcessTraffic.NAME));
+        String serviceId = dataPoint.getTagValue(ProcessTraffic.SERVICE_ID);
+        process.setServiceId(serviceId);
+        
process.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName());
+        String instanceId = dataPoint.getTagValue(ProcessTraffic.INSTANCE_ID);
+        process.setInstanceId(instanceId);
+        
process.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName());
+        process.setLayer(Layer.valueOf(((Number) 
dataPoint.getTagValue(ProcessTraffic.LAYER)).intValue()).name());
+        process.setAgentId(dataPoint.getTagValue(ProcessTraffic.AGENT_ID));
+        process.setDetectType(ProcessDetectType.valueOf(((Number) 
dataPoint.getTagValue(ProcessTraffic.DETECT_TYPE)).intValue()).name());
+
+        String propString = dataPoint.getTagValue(ProcessTraffic.PROPERTIES);
+        if (!Strings.isNullOrEmpty(propString)) {
+            JsonObject properties = GSON.fromJson(propString, 
JsonObject.class);
+            for (Map.Entry<String, JsonElement> property : 
properties.entrySet()) {
+                String key = property.getKey();
+                String value = property.getValue().getAsString();
+                process.getAttributes().add(new Attribute(key, value));
+            }
+        }
+        String labelJson = dataPoint.getTagValue(ProcessTraffic.LABELS_JSON);
+        if (!Strings.isNullOrEmpty(labelJson)) {
+            List<String> labels = GSON.<List<String>>fromJson(labelJson, 
ArrayList.class);
+            process.getLabels().addAll(labels);
+        }
+        return process;
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
similarity index 81%
rename from 
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
rename to 
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index 20a300c3b0..fdb7950c0a 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -1,8 +1,9 @@
-package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.banyandb.v1.client.TagAndValue;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
@@ -11,6 +12,8 @@ import 
org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -36,10 +39,10 @@ public class BanyanDBMetricsDAO implements IMetricsDAO {
         MeasureWrite measureWrite = new 
MeasureWrite(schema.getMetadata().getGroup(), // group name
                 model.getName(), // index-name
                 TimeBucket.getTimestamp(metrics.getTimeBucket(), 
model.getDownsampling())); // timestamp
-        final Convert2Storage<MeasureWrite> toStorage = new 
BanyanDBConverter.MeasureToStorage(schema, measureWrite);
+        final BanyanDBConverter.MeasureToStorage toStorage = new 
BanyanDBConverter.MeasureToStorage(schema, measureWrite);
         storageBuilder.entity2Storage(metrics, toStorage);
-        return new InsertRequest() {
-        };
+        toStorage.acceptID(metrics.id());
+        return new BanyanDBMeasureInsertRequest(toStorage.obtain());
     }
 
     @Override
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index 55f2cf9ec7..ef1b7768a6 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -84,7 +84,7 @@ public class BanyanDBAlarmQueryDAO extends 
AbstractBanyanDBDAO implements IAlarm
         for (final RowEntity rowEntity : resp.getElements()) {
             AlarmRecord.Builder builder = new AlarmRecord.Builder();
             AlarmRecord alarmRecord = builder.storage2Entity(
-                    new 
BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(AlarmRecord.INDEX_NAME),
 rowEntity)
+                    new 
BanyanDBConverter.StorageToStream(AlarmRecord.INDEX_NAME, rowEntity)
             );
 
             AlarmMessage message = new AlarmMessage();
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index d60484b6f0..b3d8ed77e9 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -142,7 +142,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends 
AbstractBanyanDBDAO i
         List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence 
- minSequence);
         for (final RowEntity rowEntity : resp.getElements()) {
             ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
-                    new 
BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(ProfileThreadSnapshotRecord.INDEX_NAME),
 rowEntity));
+                    new 
BanyanDBConverter.StorageToStream(ProfileThreadSnapshotRecord.INDEX_NAME, 
rowEntity));
 
             result.add(record);
         }
@@ -201,7 +201,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends 
AbstractBanyanDBDAO i
         List<ProfileThreadSnapshotRecord> records = new ArrayList<>();
         for (final RowEntity rowEntity : resp.getElements()) {
             ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
-                    new 
BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(ProfileThreadSnapshotRecord.INDEX_NAME),
 rowEntity));
+                    new 
BanyanDBConverter.StorageToStream(ProfileThreadSnapshotRecord.INDEX_NAME, 
rowEntity));
 
             records.add(record);
         }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index 48443b9e51..1598dc73d1 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -46,7 +46,7 @@ public class BanyanDBRecordDAO implements IRecordDAO {
                 model.getName(), // index-name
                 record.id(), // identity
                 TimeBucket.getTimestamp(record.getTimeBucket(), 
model.getDownsampling())); // timestamp
-        Convert2Storage<StreamWrite> convert2Storage = new 
BanyanDBConverter.StreamToStorage(streamWrite);
+        Convert2Storage<StreamWrite> convert2Storage = new 
BanyanDBConverter.StreamToStorage(schema, streamWrite);
         storageBuilder.entity2Storage(record, convert2Storage);
 
         return new BanyanDBStreamInsertRequest(convert2Storage.obtain());
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
index c3f75c8cef..76a78030e3 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBStorageDAO.java
@@ -31,7 +31,7 @@ import 
org.apache.skywalking.oap.server.core.storage.IRecordDAO;
 import org.apache.skywalking.oap.server.core.storage.StorageDAO;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBManagementDAO;
-import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBMetricsDAO;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetricsDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBNoneStreamDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
 
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index 44749dc0fb..2e3f4f71e7 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -168,7 +168,7 @@ public class BanyanDBTraceQueryDAO extends 
AbstractBanyanDBDAO implements ITrace
 
         for (final RowEntity rowEntity : resp.getElements()) {
             SegmentRecord segmentRecord = new 
SegmentRecord.Builder().storage2Entity(
-                    new 
BanyanDBConverter.StreamToEntity(MetadataRegistry.INSTANCE.findMetadata(SegmentRecord.INDEX_NAME),
 rowEntity));
+                    new 
BanyanDBConverter.StorageToStream(SegmentRecord.INDEX_NAME, rowEntity));
             segmentRecords.add(segmentRecord);
         }
 
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
index 5ae1102e38..cf35457b7a 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
@@ -18,16 +18,27 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.util;
 
-import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 public class ByteUtil {
-    private static final ThreadLocal<ByteBuffer> BYTE_BUFFER = 
ThreadLocal.withInitial(() -> ByteBuffer.allocate(8));
+    private static final ThreadLocal<ByteBuf> BYTE_BUFFER = 
ThreadLocal.withInitial(() -> Unpooled.buffer(8));
 
     public static Double bytes2Double(byte[] bytes) {
-        return BYTE_BUFFER.get().put(bytes).getDouble();
+        final ByteBuf buf = BYTE_BUFFER.get();
+        try {
+            return buf.writeBytes(bytes).readDouble();
+        } finally {
+            buf.clear();
+        }
     }
 
     public static byte[] double2Bytes(double number) {
-        return BYTE_BUFFER.get().putDouble(number).array();
+        final ByteBuf buf = BYTE_BUFFER.get();
+        try {
+            return buf.writeDouble(number).array();
+        } finally {
+            buf.clear();
+        }
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtilTest.java
similarity index 52%
copy from 
oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
copy to 
oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtilTest.java
index 5ae1102e38..a041dc1454 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtil.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/util/ByteUtilTest.java
@@ -18,16 +18,27 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.util;
 
-import java.nio.ByteBuffer;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class ByteUtil {
-    private static final ThreadLocal<ByteBuffer> BYTE_BUFFER = 
ThreadLocal.withInitial(() -> ByteBuffer.allocate(8));
-
-    public static Double bytes2Double(byte[] bytes) {
-        return BYTE_BUFFER.get().put(bytes).getDouble();
+public class ByteUtilTest {
+    @Test
+    public void testConvertDoubleAndBackOnce() {
+        double pi = 3.14159;
+        byte[] data = ByteUtil.double2Bytes(pi);
+        Assert.assertEquals(8, data.length);
+        Assert.assertEquals(pi, ByteUtil.bytes2Double(data), 0.00001);
     }
 
-    public static byte[] double2Bytes(double number) {
-        return BYTE_BUFFER.get().putDouble(number).array();
+    @Test
+    public void testConvertDoubleAndBackTwice() {
+        double pi = 3.14159;
+        byte[] binaryPI = ByteUtil.double2Bytes(pi);
+        Assert.assertEquals(8, binaryPI.length);
+        Assert.assertEquals(pi, ByteUtil.bytes2Double(binaryPI), 0.00001);
+        double e = 2.71828;
+        byte[] binaryE = ByteUtil.double2Bytes(e);
+        Assert.assertEquals(8, binaryE.length);
+        Assert.assertEquals(e, ByteUtil.bytes2Double(binaryE), 0.00001);
     }
 }

Reply via email to