This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/banyandb-integration-stream by
this push:
new d8ff7df7df Support Profiling
d8ff7df7df is described below
commit d8ff7df7df3a57dcb4303564fbbd06e1da688974
Author: Megrez Lu <[email protected]>
AuthorDate: Wed May 4 11:45:53 2022 +0800
Support Profiling
---
.../storage/plugin/banyandb/BanyanDBConverter.java | 2 +
.../plugin/banyandb/BanyanDBNoneStreamDAO.java | 17 ++-
.../plugin/banyandb/BanyanDBStorageProvider.java | 4 +-
.../storage/plugin/banyandb/MetadataRegistry.java | 1 +
.../measure/BanyanDBProfileTaskQueryDAO.java | 38 ------
.../stream/BanyanDBProfileTaskLogQueryDAO.java | 19 ++-
.../stream/BanyanDBProfileTaskQueryDAO.java | 127 +++++++++++++++++++++
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 56 ++++-----
8 files changed, 187 insertions(+), 77 deletions(-)
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index 2b2a197cf5..77963e2ff5 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -207,6 +207,8 @@ public class BanyanDBConverter {
return TagAndValue.longTagValue((int) value);
} else if (JsonObject.class.equals(clazz)) {
return TagAndValue.stringTagValue((String) value);
+ } else if (byte[].class.equals(clazz)) {
+ return TagAndValue.stringTagValue((String) value);
}
throw new IllegalStateException(clazz.getSimpleName() + " is not
supported");
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index bd4a5b0926..316e0af413 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -18,14 +18,19 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.StreamWrite;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import java.io.IOException;
+@Slf4j
public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient>
implements INoneStreamDAO {
private final StorageBuilder<NoneStream> storageBuilder;
@@ -36,6 +41,16 @@ public class BanyanDBNoneStreamDAO extends
AbstractDAO<BanyanDBStorageClient> im
@Override
public void insert(Model model, NoneStream noneStream) throws IOException {
-
+ MetadataRegistry.Schema schema =
MetadataRegistry.INSTANCE.findMetadata(model.getName());
+ if (schema == null) {
+ throw new IOException(model.getName() + " is not registered");
+ }
+ StreamWrite streamWrite = new
StreamWrite(schema.getMetadata().getGroup(), // group name
+ model.getName(), // index-name
+ noneStream.id(), // identity
+ TimeBucket.getTimestamp(noneStream.getTimeBucket(),
model.getDownsampling())); // timestamp
+ Convert2Storage<StreamWrite> convert2Storage = new
BanyanDBConverter.StreamToStorage(schema, streamWrite);
+ storageBuilder.entity2Storage(noneStream, convert2Storage);
+ getClient().write(streamWrite);
}
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 6bff41e16f..eae732bd8b 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -53,7 +53,7 @@ import
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEventQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBNetworkAddressAliasDAO;
-import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBProfileTaskQueryDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBServiceLabelDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO;
@@ -110,7 +110,7 @@ public class BanyanDBStorageProvider extends ModuleProvider
{
this.registerServiceImplementation(IMetadataQueryDAO.class, new
BanyanDBMetadataQueryDAO(client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new
BanyanDBAlarmQueryDAO(client));
this.registerServiceImplementation(ILogQueryDAO.class, new
BanyanDBLogQueryDAO(client));
- this.registerServiceImplementation(IProfileTaskQueryDAO.class, new
BanyanDBProfileTaskQueryDAO());
+ this.registerServiceImplementation(IProfileTaskQueryDAO.class, new
BanyanDBProfileTaskQueryDAO(client));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new
BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize()));
this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new
BanyanDBProfileThreadSnapshotQueryDAO(client));
this.registerServiceImplementation(UITemplateManagementDAO.class, new
BanyanDBUITemplateManagementDAO(client));
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 6ee58c5283..20264057d1 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -45,6 +45,7 @@ import
org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
import org.apache.skywalking.oap.server.core.config.ConfigService;
+import
org.apache.skywalking.oap.server.core.profiling.trace.ProfileThreadSnapshotRecord;
import
org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
deleted file mode 100644
index 690bc6640d..0000000000
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
-
-import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
-import
org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO {
- @Override
- public List<ProfileTask> getTaskList(String serviceId, String
endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws
IOException {
- return Collections.emptyList();
- }
-
- @Override
- public ProfileTask getById(String id) throws IOException {
- return null;
- }
-}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
index de7a97c193..55a5bbddf2 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java
@@ -46,8 +46,10 @@ public class BanyanDBProfileTaskLogQueryDAO extends
AbstractBanyanDBDAO implemen
@Override
public List<ProfileTaskLog> getTaskLogList() throws IOException {
StreamQueryResponse resp = query(ProfileTaskLogRecord.INDEX_NAME,
- ImmutableSet.of(ProfileTaskLogRecord.OPERATION_TIME,
ProfileTaskLogRecord.INSTANCE_ID,
- ProfileTaskLogRecord.TASK_ID,
ProfileTaskLogRecord.OPERATION_TYPE),
+ ImmutableSet.of(ProfileTaskLogRecord.OPERATION_TIME,
+ ProfileTaskLogRecord.INSTANCE_ID,
+ ProfileTaskLogRecord.TASK_ID,
+ ProfileTaskLogRecord.OPERATION_TYPE),
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
@@ -57,22 +59,19 @@ public class BanyanDBProfileTaskLogQueryDAO extends
AbstractBanyanDBDAO implemen
final LinkedList<ProfileTaskLog> tasks = new LinkedList<>();
for (final RowEntity rowEntity : resp.getElements()) {
- tasks.add(parseTaskLog(rowEntity));
+ tasks.add(buildProfileTaskLog(rowEntity));
}
return tasks;
}
- private ProfileTaskLog parseTaskLog(RowEntity data) {
+ private ProfileTaskLog buildProfileTaskLog(RowEntity data) {
return ProfileTaskLog.builder()
.id(data.getId())
.taskId(data.getTagValue(ProfileTaskLogRecord.TASK_ID))
- .instanceId(
- data.getTagValue(ProfileTaskLogRecord.INSTANCE_ID))
- .operationType(ProfileTaskLogOperationType.parse(
- ((Number)
data.getTagValue(ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
- .operationTime(
- ((Number)
data.getTagValue(ProfileTaskLogRecord.OPERATION_TIME)).longValue())
+ .instanceId(data.getTagValue(ProfileTaskLogRecord.INSTANCE_ID))
+ .operationType(ProfileTaskLogOperationType.parse(((Number)
data.getTagValue(ProfileTaskLogRecord.OPERATION_TYPE)).intValue()))
+ .operationTime(((Number)
data.getTagValue(ProfileTaskLogRecord.OPERATION_TIME)).longValue())
.build();
}
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
new file mode 100644
index 0000000000..2f3cca670e
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskRecord;
+import org.apache.skywalking.oap.server.core.query.type.ProfileTask;
+import
org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO
implements IProfileTaskQueryDAO {
+ public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
+ @Override
+ public List<ProfileTask> getTaskList(String serviceId, String
endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws
IOException {
+ StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
ImmutableSet.of(
+ ProfileTaskRecord.SERVICE_ID,
+ ProfileTaskRecord.ENDPOINT_NAME,
+ ProfileTaskRecord.START_TIME,
+ ProfileTaskRecord.CREATE_TIME,
+ ProfileTaskRecord.DURATION,
+ ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+ ProfileTaskRecord.DUMP_PERIOD,
+ ProfileTaskRecord.MAX_SAMPLING_COUNT
+ ), new QueryBuilder<StreamQuery>() {
+ @Override
+ protected void apply(StreamQuery query) {
+ if (StringUtil.isNotEmpty(serviceId)) {
+ query.and(eq(ProfileTaskRecord.SERVICE_ID, serviceId));
+ }
+ if (StringUtil.isNotEmpty(endpointName)) {
+ query.and(eq(ProfileTaskRecord.ENDPOINT_NAME,
endpointName));
+ }
+ if (startTimeBucket != null) {
+ query.and(gte(ProfileTaskRecord.TIME_BUCKET,
startTimeBucket));
+ }
+ if (endTimeBucket != null) {
+ query.and(lte(ProfileTaskRecord.TIME_BUCKET,
endTimeBucket));
+ }
+ if (limit != null) {
+ query.setLimit(limit);
+ }
+ }
+ });
+
+ if (resp.size() == 0) {
+ return Collections.emptyList();
+ }
+
+ List<ProfileTask> profileTasks = new ArrayList<>(resp.size());
+ for (final RowEntity entity : resp.getElements()) {
+ profileTasks.add(buildProfileTask(entity));
+ }
+
+ return profileTasks;
+ }
+
+ @Override
+ public ProfileTask getById(String id) throws IOException {
+ StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME,
ImmutableSet.of(
+ ProfileTaskRecord.SERVICE_ID,
+ ProfileTaskRecord.ENDPOINT_NAME,
+ ProfileTaskRecord.START_TIME,
+ ProfileTaskRecord.CREATE_TIME,
+ ProfileTaskRecord.DURATION,
+ ProfileTaskRecord.MIN_DURATION_THRESHOLD,
+ ProfileTaskRecord.DUMP_PERIOD,
+ ProfileTaskRecord.MAX_SAMPLING_COUNT
+ ), new QueryBuilder<StreamQuery>() {
+ @Override
+ protected void apply(StreamQuery query) {
+ if (StringUtil.isNotEmpty(id)) {
+ // TODO: support search by ID
+ }
+ // query.setLimit(1);
+ }
+ });
+
+ if (resp.size() == 0) {
+ return null;
+ }
+
+ RowEntity first = resp.getElements().stream().filter(e ->
id.equals(e.getId())).findFirst().orElse(null);
+ return first == null ? null : buildProfileTask(first);
+ }
+
+ private ProfileTask buildProfileTask(RowEntity data) {
+ return ProfileTask.builder()
+ .id(data.getId())
+ .serviceId(data.getTagValue(ProfileTaskRecord.SERVICE_ID))
+
.endpointName(data.getTagValue(ProfileTaskRecord.ENDPOINT_NAME))
+ .startTime(((Number)
data.getTagValue(ProfileTaskRecord.START_TIME)).longValue())
+ .createTime(((Number)
data.getTagValue(ProfileTaskRecord.CREATE_TIME)).longValue())
+ .duration(((Number)
data.getTagValue(ProfileTaskRecord.DURATION)).intValue())
+ .minDurationThreshold(((Number)
data.getTagValue(ProfileTaskRecord.MIN_DURATION_THRESHOLD)).intValue())
+ .dumpPeriod(((Number)
data.getTagValue(ProfileTaskRecord.DUMP_PERIOD)).intValue())
+ .maxSamplingCount(((Number)
data.getTagValue(ProfileTaskRecord.MAX_SAMPLING_COUNT)).intValue())
+ .build();
+ }
+}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index fcf7228de3..68bd752139 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -77,7 +77,13 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
List<BasicTrace> basicTraces = new ArrayList<>();
for (String segmentID : segmentIds) {
final StreamQueryResponse segmentRecordResp =
query(SegmentRecord.INDEX_NAME,
- ImmutableSet.of(SegmentRecord.TRACE_ID,
SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY,
SegmentRecord.START_TIME),
+ ImmutableSet.of(SegmentRecord.TRACE_ID,
+ SegmentRecord.IS_ERROR,
+ SegmentRecord.SERVICE_ID,
+ SegmentRecord.SERVICE_INSTANCE_ID,
+ SegmentRecord.ENDPOINT_ID,
+ SegmentRecord.LATENCY,
+ SegmentRecord.START_TIME),
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery traceQuery) {
@@ -89,7 +95,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(row.getId());
-
basicTrace.setStart(String.valueOf(row.getTagValue(SegmentRecord.START_TIME)));
+ basicTrace.setStart(String.valueOf((Number)
row.getTagValue(SegmentRecord.START_TIME)));
basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId(
row.getTagValue(SegmentRecord.ENDPOINT_ID)
).getEndpointName());
@@ -126,8 +132,11 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
@Override
public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId,
int minSequence, int maxSequence) throws IOException {
StreamQueryResponse resp =
query(ProfileThreadSnapshotRecord.INDEX_NAME,
- ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID,
ProfileThreadSnapshotRecord.SEGMENT_ID,
- ProfileThreadSnapshotRecord.DUMP_TIME,
ProfileThreadSnapshotRecord.SEQUENCE,
+ ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID,
+ ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME,
+ ProfileThreadSnapshotRecord.SEQUENCE,
+ ProfileThreadSnapshotRecord.TIME_BUCKET,
ProfileThreadSnapshotRecord.STACK_BINARY),
new QueryBuilder<StreamQuery>() {
@Override
@@ -142,7 +151,6 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
for (final RowEntity rowEntity : resp.getElements()) {
ProfileThreadSnapshotRecord record = this.builder.storage2Entity(
new
BanyanDBConverter.StorageToStream(ProfileThreadSnapshotRecord.INDEX_NAME,
rowEntity));
-
result.add(record);
}
return result;
@@ -151,13 +159,19 @@ public class BanyanDBProfileThreadSnapshotQueryDAO
extends AbstractBanyanDBDAO i
@Override
public SegmentRecord getProfiledSegment(String segmentId) throws
IOException {
StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME,
- ImmutableSet.of(SegmentRecord.TRACE_ID,
SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID,
- SegmentRecord.SERVICE_INSTANCE_ID,
SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY,
- SegmentRecord.START_TIME, SegmentRecord.DATA_BINARY),
+ ImmutableSet.of(SegmentRecord.TRACE_ID,
+ SegmentRecord.IS_ERROR,
+ SegmentRecord.SERVICE_ID,
+ SegmentRecord.SERVICE_INSTANCE_ID,
+ SegmentRecord.ENDPOINT_ID,
+ SegmentRecord.LATENCY,
+ SegmentRecord.START_TIME,
+ SegmentRecord.TIME_BUCKET,
+ SegmentRecord.DATA_BINARY),
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
- query.and(eq(SegmentRecord.INDEX_NAME, segmentId));
+ query.and(eq(SegmentRecord.SEGMENT_ID, segmentId));
}
});
@@ -166,27 +180,17 @@ public class BanyanDBProfileThreadSnapshotQueryDAO
extends AbstractBanyanDBDAO i
}
final RowEntity rowEntity = resp.getElements().iterator().next();
- final SegmentRecord segmentRecord = new SegmentRecord();
-
segmentRecord.setSegmentId(rowEntity.getTagValue(SegmentRecord.SEGMENT_ID));
-
segmentRecord.setTraceId(rowEntity.getTagValue(SegmentRecord.TRACE_ID));
-
segmentRecord.setServiceId(rowEntity.getTagValue(SegmentRecord.SERVICE_ID));
- segmentRecord.setStartTime(
- ((Number)
rowEntity.getTagValue(SegmentRecord.START_TIME)).longValue());
- segmentRecord.setLatency(
- ((Number)
rowEntity.getTagValue(SegmentRecord.LATENCY)).intValue());
- segmentRecord.setIsError(
- ((Number)
rowEntity.getTagValue(SegmentRecord.IS_ERROR)).intValue());
- byte[] dataBinary = rowEntity.getTagValue(SegmentRecord.DATA_BINARY);
- if (dataBinary != null && dataBinary.length > 0) {
- segmentRecord.setDataBinary(dataBinary);
- }
- return segmentRecord;
+ return new SegmentRecord.Builder().storage2Entity(
+ new
BanyanDBConverter.StorageToStream(SegmentRecord.INDEX_NAME, rowEntity));
}
private int querySequenceWithAgg(AggType aggType, String segmentId, long
start, long end) throws IOException {
StreamQueryResponse resp =
query(ProfileThreadSnapshotRecord.INDEX_NAME,
- ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID,
ProfileThreadSnapshotRecord.SEGMENT_ID,
- ProfileThreadSnapshotRecord.DUMP_TIME,
ProfileThreadSnapshotRecord.SEQUENCE,
+ ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID,
+ ProfileThreadSnapshotRecord.SEGMENT_ID,
+ ProfileThreadSnapshotRecord.DUMP_TIME,
+ ProfileThreadSnapshotRecord.SEQUENCE,
+ ProfileThreadSnapshotRecord.TIME_BUCKET,
ProfileThreadSnapshotRecord.STACK_BINARY),
new QueryBuilder<StreamQuery>() {
@Override