This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/banyandb-integration-stream by 
this push:
     new d8ff7df7df Support Profiling
d8ff7df7df is described below

commit d8ff7df7df3a57dcb4303564fbbd06e1da688974
Author: Megrez Lu <[email protected]>
AuthorDate: Wed May 4 11:45:53 2022 +0800

    Support Profiling
---
 .../storage/plugin/banyandb/BanyanDBConverter.java |   2 +
 .../plugin/banyandb/BanyanDBNoneStreamDAO.java     |  17 ++-
 .../plugin/banyandb/BanyanDBStorageProvider.java   |   4 +-
 .../storage/plugin/banyandb/MetadataRegistry.java  |   1 +
 .../measure/BanyanDBProfileTaskQueryDAO.java       |  38 ------
 .../stream/BanyanDBProfileTaskLogQueryDAO.java     |  19 ++-
 .../stream/BanyanDBProfileTaskQueryDAO.java        | 127 +++++++++++++++++++++
 .../BanyanDBProfileThreadSnapshotQueryDAO.java     |  56 ++++-----
 8 files changed, 187 insertions(+), 77 deletions(-)

diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index 2b2a197cf5..77963e2ff5 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -207,6 +207,8 @@ public class BanyanDBConverter {
             return TagAndValue.longTagValue((int) value);
         } else if (JsonObject.class.equals(clazz)) {
             return TagAndValue.stringTagValue((String) value);
+        } else if (byte[].class.equals(clazz)) {
+            return TagAndValue.stringTagValue((String) value);
         }
         throw new IllegalStateException(clazz.getSimpleName() + " is not 
supported");
     }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index bd4a5b0926..316e0af413 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -18,14 +18,19 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
 import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
 import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 
 import java.io.IOException;
 
+@Slf4j
 public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient> 
implements INoneStreamDAO {
     private final StorageBuilder<NoneStream> storageBuilder;
 
@@ -36,6 +41,16 @@ public class BanyanDBNoneStreamDAO extends 
AbstractDAO<BanyanDBStorageClient> im
 
     @Override
     public void insert(Model model, NoneStream noneStream) throws IOException {
-
+        MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model.getName());
+        if (schema == null) {
+            throw new IOException(model.getName() + " is not registered");
+        }
+        StreamWrite streamWrite = new 
StreamWrite(schema.getMetadata().getGroup(), // group name
+                model.getName(), // index-name
+                noneStream.id(), // identity
+                TimeBucket.getTimestamp(noneStream.getTimeBucket(), 
model.getDownsampling())); // timestamp
+        Convert2Storage<StreamWrite> convert2Storage = new 
BanyanDBConverter.StreamToStorage(schema, streamWrite);
+        storageBuilder.entity2Storage(noneStream, convert2Storage);
+        getClient().write(streamWrite);
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 6bff41e16f..eae732bd8b 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -53,7 +53,7 @@ import 
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEventQueryDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBNetworkAddressAliasDAO;
-import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBProfileTaskQueryDAO;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskQueryDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBServiceLabelDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO;
@@ -110,7 +110,7 @@ public class BanyanDBStorageProvider extends ModuleProvider 
{
         this.registerServiceImplementation(IMetadataQueryDAO.class, new 
BanyanDBMetadataQueryDAO(client));
         this.registerServiceImplementation(IAlarmQueryDAO.class, new 
BanyanDBAlarmQueryDAO(client));
         this.registerServiceImplementation(ILogQueryDAO.class, new 
BanyanDBLogQueryDAO(client));
-        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new 
BanyanDBProfileTaskQueryDAO());
+        this.registerServiceImplementation(IProfileTaskQueryDAO.class, new 
BanyanDBProfileTaskQueryDAO(client));
         this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new 
BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize()));
         
this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new 
BanyanDBProfileThreadSnapshotQueryDAO(client));
         this.registerServiceImplementation(UITemplateManagementDAO.class, new 
BanyanDBUITemplateManagementDAO(client));
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 6ee58c5283..20264057d1 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -45,6 +45,7 @@ import 
org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco
 import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
 import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
+import 
org.apache.skywalking.oap.server.core.profiling.trace.ProfileThreadSnapshotRecord;
 import 
org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
deleted file mode 100644
index 690bc6640d..0000000000
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
-
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import 
org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
-    @Override
-    public List<ProfileTask> getTaskList(String serviceId, String 
endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws 
IOException {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public ProfileTask getById(String id) throws IOException {
-        return null;
-    }
-}
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index de7a97c193..55a5bbddf2 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -46,8 +46,10 @@ public class BanyanDBProfileTaskLogQueryDAO extends 
AbstractBanyanDBDAO implemen
     @Override
     public List<ProfileTaskLog> getTaskLogList() throws IOException {
         StreamQueryResponse resp = query(ProfileTaskLogRecord.INDEX_NAME,
-                ImmutableSet.of(ProfileTaskLogRecord.OPERATION_TIME, 
ProfileTaskLogRecord.INSTANCE_ID,
-                        ProfileTaskLogRecord.TASK_ID, 
ProfileTaskLogRecord.OPERATION_TYPE),
+                ImmutableSet.of(ProfileTaskLogRecord.OPERATION_TIME,
+                        ProfileTaskLogRecord.INSTANCE_ID,
+                        ProfileTaskLogRecord.TASK_ID,
+                        ProfileTaskLogRecord.OPERATION_TYPE),
                 new QueryBuilder<StreamQuery>() {
                     @Override
                     public void apply(StreamQuery query) {
@@ -57,22 +59,19 @@ public class BanyanDBProfileTaskLogQueryDAO extends 
AbstractBanyanDBDAO implemen
 
         final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
         for (final RowEntity rowEntity : resp.getElements()) {
-            tasks.add(parseTaskLog(rowEntity));
+            tasks.add(buildProfileTaskLog(rowEntity));
         }
 
         return tasks;
     }
 
-    private ProfileTaskLog parseTaskLog(RowEntity data) {
+    private ProfileTaskLog buildProfileTaskLog(RowEntity data) {
         return ProfileTaskLog.builder()
                 .id(data.getId())
                 .taskId(data.getTagValue(ProfileTaskLogRecord.TASK_ID))
-                .instanceId(
-                        data.getTagValue(ProfileTaskLogRecord.INSTANCE_ID))
-                .operationType(ProfileTaskLogOperationType.parse(
-                        ((Number) 
data.getTagValue(ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
-                .operationTime(
-                        ((Number) 
data.getTagValue(ProfileTaskLogRecord.OPERATION_TIME)).longValue())
+                .instanceId(data.getTagValue(ProfileTaskLogRecord.INSTANCE_ID))
+                .operationType(ProfileTaskLogOperationType.parse(((Number) 
data.getTagValue(ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
+                .operationTime(((Number) 
data.getTagValue(ProfileTaskLogRecord.OPERATION_TIME)).longValue())
                 .build();
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
new file mode 100644
index 0000000000..2f3cca670e
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import 
org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO 
implements IProfileTaskQueryDAO {
+    public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
+        super(client);
+    }
+
+    @Override
+    public List<ProfileTask> getTaskList(String serviceId, String 
endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws 
IOException {
+        StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME, 
ImmutableSet.of(
+                ProfileTaskRecord.SERVICE_ID,
+                ProfileTaskRecord.ENDPOINT_NAME,
+                ProfileTaskRecord.START_TIME,
+                ProfileTaskRecord.CREATE_TIME,
+                ProfileTaskRecord.DURATION,
+                ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+                ProfileTaskRecord.DUMP_PERIOD,
+                ProfileTaskRecord.MAX_SAMPLING_COUNT
+        ), new QueryBuilder<StreamQuery>() {
+            @Override
+            protected void apply(StreamQuery query) {
+                if (StringUtil.isNotEmpty(serviceId)) {
+                    query.and(eq(ProfileTaskRecord.SERVICE_ID, serviceId));
+                }
+                if (StringUtil.isNotEmpty(endpointName)) {
+                    query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, 
endpointName));
+                }
+                if (startTimeBucket != null) {
+                    query.and(gte(ProfileTaskRecord.TIME_BUCKET, 
startTimeBucket));
+                }
+                if (endTimeBucket != null) {
+                    query.and(lte(ProfileTaskRecord.TIME_BUCKET, 
endTimeBucket));
+                }
+                if (limit != null) {
+                    query.setLimit(limit);
+                }
+            }
+        });
+
+        if (resp.size() == 0) {
+            return Collections.emptyList();
+        }
+
+        List<ProfileTask> profileTasks = new ArrayList<>(resp.size());
+        for (final RowEntity entity : resp.getElements()) {
+            profileTasks.add(buildProfileTask(entity));
+        }
+
+        return profileTasks;
+    }
+
+    @Override
+    public ProfileTask getById(String id) throws IOException {
+        StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME, 
ImmutableSet.of(
+                ProfileTaskRecord.SERVICE_ID,
+                ProfileTaskRecord.ENDPOINT_NAME,
+                ProfileTaskRecord.START_TIME,
+                ProfileTaskRecord.CREATE_TIME,
+                ProfileTaskRecord.DURATION,
+                ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+                ProfileTaskRecord.DUMP_PERIOD,
+                ProfileTaskRecord.MAX_SAMPLING_COUNT
+        ), new QueryBuilder<StreamQuery>() {
+            @Override
+            protected void apply(StreamQuery query) {
+                if (StringUtil.isNotEmpty(id)) {
+                    // TODO: support search by ID
+                }
+                // query.setLimit(1);
+            }
+        });
+
+        if (resp.size() == 0) {
+            return null;
+        }
+
+        RowEntity first = resp.getElements().stream().filter(e -> 
id.equals(e.getId())).findFirst().orElse(null);
+        return first == null ? null : buildProfileTask(first);
+    }
+
+    private ProfileTask buildProfileTask(RowEntity data) {
+        return ProfileTask.builder()
+                .id(data.getId())
+                .serviceId(data.getTagValue(ProfileTaskRecord.SERVICE_ID))
+                
.endpointName(data.getTagValue(ProfileTaskRecord.ENDPOINT_NAME))
+                .startTime(((Number) 
data.getTagValue(ProfileTaskRecord.START_TIME)).longValue())
+                .createTime(((Number) 
data.getTagValue(ProfileTaskRecord.CREATE_TIME)).longValue())
+                .duration(((Number) 
data.getTagValue(ProfileTaskRecord.DURATION)).intValue())
+                .minDurationThreshold(((Number) 
data.getTagValue(ProfileTaskRecord.MIN_DURATION_THRESHOLD)).intValue())
+                .dumpPeriod(((Number) 
data.getTagValue(ProfileTaskRecord.DUMP_PERIOD)).intValue())
+                .maxSamplingCount(((Number) 
data.getTagValue(ProfileTaskRecord.MAX_SAMPLING_COUNT)).intValue())
+                .build();
+    }
+}
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index fcf7228de3..68bd752139 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -77,7 +77,13 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends 
AbstractBanyanDBDAO i
         List<BasicTrace> basicTraces = new ArrayList<>();
         for (String segmentID : segmentIds) {
             final StreamQueryResponse segmentRecordResp = 
query(SegmentRecord.INDEX_NAME,
-                    ImmutableSet.of(SegmentRecord.TRACE_ID, 
SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, 
SegmentRecord.START_TIME),
+                    ImmutableSet.of(SegmentRecord.TRACE_ID,
+                            SegmentRecord.IS_ERROR,
+                            SegmentRecord.SERVICE_ID,
+                            SegmentRecord.SERVICE_INSTANCE_ID,
+                            SegmentRecord.ENDPOINT_ID,
+                            SegmentRecord.LATENCY,
+                            SegmentRecord.START_TIME),
                     new QueryBuilder<StreamQuery>() {
                         @Override
                         public void apply(StreamQuery traceQuery) {
@@ -89,7 +95,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends 
AbstractBanyanDBDAO i
                 BasicTrace basicTrace = new BasicTrace();
 
                 basicTrace.setSegmentId(row.getId());
-                
basicTrace.setStart(String.valueOf(row.getTagValue(SegmentRecord.START_TIME)));
+                basicTrace.setStart(String.valueOf((Number) 
row.getTagValue(SegmentRecord.START_TIME)));
                 
basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
                         row.getTagValue(SegmentRecord.ENDPOINT_ID)
                 ).getEndpointName());
@@ -126,8 +132,11 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends 
AbstractBanyanDBDAO i
     @Override
     public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, 
int minSequence, int maxSequence) throws IOException {
         StreamQueryResponse resp = 
query(ProfileThreadSnapshotRecord.INDEX_NAME,
-                ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, 
ProfileThreadSnapshotRecord.SEGMENT_ID,
-                        ProfileThreadSnapshotRecord.DUMP_TIME, 
ProfileThreadSnapshotRecord.SEQUENCE,
+                ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID,
+                        ProfileThreadSnapshotRecord.SEGMENT_ID,
+                        ProfileThreadSnapshotRecord.DUMP_TIME,
+                        ProfileThreadSnapshotRecord.SEQUENCE,
+                        ProfileThreadSnapshotRecord.TIME_BUCKET,
                         ProfileThreadSnapshotRecord.STACK_BINARY),
                 new QueryBuilder<StreamQuery>() {
                     @Override
@@ -142,7 +151,6 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends 
AbstractBanyanDBDAO i
         for (final RowEntity rowEntity : resp.getElements()) {
             ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
                     new 
BanyanDBConverter.StorageToStream(ProfileThreadSnapshotRecord.INDEX_NAME, 
rowEntity));
-
             result.add(record);
         }
         return result;
@@ -151,13 +159,19 @@ public class BanyanDBProfileThreadSnapshotQueryDAO 
extends AbstractBanyanDBDAO i
     @Override
     public SegmentRecord getProfiledSegment(String segmentId) throws 
IOException {
         StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
-                ImmutableSet.of(SegmentRecord.TRACE_ID, 
SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID,
-                        SegmentRecord.SERVICE_INSTANCE_ID, 
SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY,
-                        SegmentRecord.START_TIME, SegmentRecord.DATA_BINARY),
+                ImmutableSet.of(SegmentRecord.TRACE_ID,
+                        SegmentRecord.IS_ERROR,
+                        SegmentRecord.SERVICE_ID,
+                        SegmentRecord.SERVICE_INSTANCE_ID,
+                        SegmentRecord.ENDPOINT_ID,
+                        SegmentRecord.LATENCY,
+                        SegmentRecord.START_TIME,
+                        SegmentRecord.TIME_BUCKET,
+                        SegmentRecord.DATA_BINARY),
                 new QueryBuilder<StreamQuery>() {
                     @Override
                     public void apply(StreamQuery query) {
-                        query.and(eq(SegmentRecord.INDEX_NAME, segmentId));
+                        query.and(eq(SegmentRecord.SEGMENT_ID, segmentId));
                     }
                 });
 
@@ -166,27 +180,17 @@ public class BanyanDBProfileThreadSnapshotQueryDAO 
extends AbstractBanyanDBDAO i
         }
 
         final RowEntity rowEntity = resp.getElements().iterator().next();
-        final SegmentRecord segmentRecord = new SegmentRecord();
-        
segmentRecord.setSegmentId(rowEntity.getTagValue(SegmentRecord.SEGMENT_ID));
-        
segmentRecord.setTraceId(rowEntity.getTagValue(SegmentRecord.TRACE_ID));
-        
segmentRecord.setServiceId(rowEntity.getTagValue(SegmentRecord.SERVICE_ID));
-        segmentRecord.setStartTime(
-                ((Number) 
rowEntity.getTagValue(SegmentRecord.START_TIME)).longValue());
-        segmentRecord.setLatency(
-                ((Number) 
rowEntity.getTagValue(SegmentRecord.LATENCY)).intValue());
-        segmentRecord.setIsError(
-                ((Number) 
rowEntity.getTagValue(SegmentRecord.IS_ERROR)).intValue());
-        byte[] dataBinary = rowEntity.getTagValue(SegmentRecord.DATA_BINARY);
-        if (dataBinary != null && dataBinary.length > 0) {
-            segmentRecord.setDataBinary(dataBinary);
-        }
-        return segmentRecord;
+        return new SegmentRecord.Builder().storage2Entity(
+                new 
BanyanDBConverter.StorageToStream(SegmentRecord.INDEX_NAME, rowEntity));
     }
 
     private int querySequenceWithAgg(AggType aggType, String segmentId, long 
start, long end) throws IOException {
         StreamQueryResponse resp = 
query(ProfileThreadSnapshotRecord.INDEX_NAME,
-                ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, 
ProfileThreadSnapshotRecord.SEGMENT_ID,
-                        ProfileThreadSnapshotRecord.DUMP_TIME, 
ProfileThreadSnapshotRecord.SEQUENCE,
+                ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID,
+                        ProfileThreadSnapshotRecord.SEGMENT_ID,
+                        ProfileThreadSnapshotRecord.DUMP_TIME,
+                        ProfileThreadSnapshotRecord.SEQUENCE,
+                        ProfileThreadSnapshotRecord.TIME_BUCKET,
                         ProfileThreadSnapshotRecord.STACK_BINARY),
                 new QueryBuilder<StreamQuery>() {
                     @Override

Reply via email to