This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 3b7bffa22987d2c42975eb2ea1fc05f335953212 Author: Megrez Lu <[email protected]> AuthorDate: Wed May 4 11:45:53 2022 +0800 Support Profiling --- .../storage/plugin/banyandb/BanyanDBConverter.java | 2 + .../plugin/banyandb/BanyanDBNoneStreamDAO.java | 17 ++- .../plugin/banyandb/BanyanDBStorageProvider.java | 4 +- .../storage/plugin/banyandb/MetadataRegistry.java | 1 + .../measure/BanyanDBProfileTaskQueryDAO.java | 38 ------ .../stream/BanyanDBProfileTaskLogQueryDAO.java | 19 ++- .../stream/BanyanDBProfileTaskQueryDAO.java | 127 +++++++++++++++++++++ .../BanyanDBProfileThreadSnapshotQueryDAO.java | 56 ++++----- 8 files changed, 187 insertions(+), 77 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java index 2b2a197cf5..77963e2ff5 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java @@ -207,6 +207,8 @@ public class BanyanDBConverter { return TagAndValue.longTagValue((int) value); } else if (JsonObject.class.equals(clazz)) { return TagAndValue.stringTagValue((String) value); + } else if (byte[].class.equals(clazz)) { + return TagAndValue.stringTagValue((String) value); } throw new IllegalStateException(clazz.getSimpleName() + " is not supported"); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java index bd4a5b0926..316e0af413 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java @@ -18,14 +18,19 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.banyandb.v1.client.StreamWrite; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.analysis.config.NoneStream; import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import java.io.IOException; +@Slf4j public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient> implements INoneStreamDAO { private final StorageBuilder<NoneStream> storageBuilder; @@ -36,6 +41,16 @@ public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient> im @Override public void insert(Model model, NoneStream noneStream) throws IOException { - + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model.getName()); + if (schema == null) { + throw new IOException(model.getName() + " is not registered"); + } + StreamWrite streamWrite = new StreamWrite(schema.getMetadata().getGroup(), // group name + model.getName(), // index-name + noneStream.id(), // identity + TimeBucket.getTimestamp(noneStream.getTimeBucket(), model.getDownsampling())); // timestamp + Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(schema, streamWrite); + storageBuilder.entity2Storage(noneStream, convert2Storage); + getClient().write(streamWrite); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java index 6bff41e16f..eae732bd8b 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java @@ -53,7 +53,7 @@ import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedExcepti import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBEventQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBNetworkAddressAliasDAO; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBServiceLabelDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO; @@ -110,7 +110,7 @@ public class BanyanDBStorageProvider extends ModuleProvider { this.registerServiceImplementation(IMetadataQueryDAO.class, new BanyanDBMetadataQueryDAO(client)); this.registerServiceImplementation(IAlarmQueryDAO.class, new BanyanDBAlarmQueryDAO(client)); this.registerServiceImplementation(ILogQueryDAO.class, new BanyanDBLogQueryDAO(client)); - this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO()); + this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO(client)); this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize())); this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client)); this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO(client)); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 6ee58c5283..20264057d1 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -45,6 +45,7 @@ import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentReco import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable; import org.apache.skywalking.oap.server.core.analysis.metrics.IntList; import org.apache.skywalking.oap.server.core.config.ConfigService; +import org.apache.skywalking.oap.server.core.profiling.trace.ProfileThreadSnapshotRecord; import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java deleted file mode 100644 index 690bc6640d..0000000000 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; - -import org.apache.skywalking.oap.server.core.query.type.ProfileTask; -import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO { - @Override - public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException { - return Collections.emptyList(); - } - - @Override - public ProfileTask getById(String id) throws IOException { - return null; - } -} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java index de7a97c193..55a5bbddf2 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java @@ -46,8 +46,10 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen @Override public List<ProfileTaskLog> getTaskLogList() throws IOException { StreamQueryResponse resp = query(ProfileTaskLogRecord.INDEX_NAME, - ImmutableSet.of(ProfileTaskLogRecord.OPERATION_TIME, ProfileTaskLogRecord.INSTANCE_ID, - ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.OPERATION_TYPE), + ImmutableSet.of(ProfileTaskLogRecord.OPERATION_TIME, + ProfileTaskLogRecord.INSTANCE_ID, + ProfileTaskLogRecord.TASK_ID, + ProfileTaskLogRecord.OPERATION_TYPE), new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery query) { @@ -57,22 +59,19 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen final LinkedList<ProfileTaskLog> tasks = new LinkedList<>(); for (final RowEntity rowEntity : resp.getElements()) { - tasks.add(parseTaskLog(rowEntity)); + tasks.add(buildProfileTaskLog(rowEntity)); } return tasks; } - private ProfileTaskLog parseTaskLog(RowEntity data) { + private ProfileTaskLog buildProfileTaskLog(RowEntity data) { return ProfileTaskLog.builder() .id(data.getId()) .taskId(data.getTagValue(ProfileTaskLogRecord.TASK_ID)) - .instanceId( - data.getTagValue(ProfileTaskLogRecord.INSTANCE_ID)) - .operationType(ProfileTaskLogOperationType.parse( - ((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TYPE)).intValue())) - .operationTime( - ((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TIME)).longValue()) + .instanceId(data.getTagValue(ProfileTaskLogRecord.INSTANCE_ID)) + .operationType(ProfileTaskLogOperationType.parse(((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TYPE)).intValue())) + .operationTime(((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TIME)).longValue()) .build(); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java new file mode 100644 index 0000000000..2f3cca670e --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; + +import com.google.common.collect.ImmutableSet; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskRecord; +import org.apache.skywalking.oap.server.core.query.type.ProfileTask; +import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.library.util.StringUtil; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class BanyanDBProfileTaskQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskQueryDAO { + public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) { + super(client); + } + + @Override + public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException { + StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME, ImmutableSet.of( + ProfileTaskRecord.SERVICE_ID, + ProfileTaskRecord.ENDPOINT_NAME, + ProfileTaskRecord.START_TIME, + ProfileTaskRecord.CREATE_TIME, + ProfileTaskRecord.DURATION, + ProfileTaskRecord.MIN_DURATION_THRESHOLD, + ProfileTaskRecord.DUMP_PERIOD, + ProfileTaskRecord.MAX_SAMPLING_COUNT + ), new QueryBuilder<StreamQuery>() { + @Override + protected void apply(StreamQuery query) { + if (StringUtil.isNotEmpty(serviceId)) { + query.and(eq(ProfileTaskRecord.SERVICE_ID, serviceId)); + } + if (StringUtil.isNotEmpty(endpointName)) { + query.and(eq(ProfileTaskRecord.ENDPOINT_NAME, endpointName)); + } + if (startTimeBucket != null) { + query.and(gte(ProfileTaskRecord.TIME_BUCKET, startTimeBucket)); + } + if (endTimeBucket != null) { + query.and(lte(ProfileTaskRecord.TIME_BUCKET, endTimeBucket)); + } + if (limit != null) { + query.setLimit(limit); + } + } + }); + + if (resp.size() == 0) { + return Collections.emptyList(); + } + + List<ProfileTask> profileTasks = new ArrayList<>(resp.size()); + for (final RowEntity entity : resp.getElements()) { + profileTasks.add(buildProfileTask(entity)); + } + + return profileTasks; + } + + @Override + public ProfileTask getById(String id) throws IOException { + StreamQueryResponse resp = query(ProfileTaskRecord.INDEX_NAME, ImmutableSet.of( + ProfileTaskRecord.SERVICE_ID, + ProfileTaskRecord.ENDPOINT_NAME, + ProfileTaskRecord.START_TIME, + ProfileTaskRecord.CREATE_TIME, + ProfileTaskRecord.DURATION, + ProfileTaskRecord.MIN_DURATION_THRESHOLD, + ProfileTaskRecord.DUMP_PERIOD, + ProfileTaskRecord.MAX_SAMPLING_COUNT + ), new QueryBuilder<StreamQuery>() { + @Override + protected void apply(StreamQuery query) { + if (StringUtil.isNotEmpty(id)) { + // TODO: support search by ID + } + // query.setLimit(1); + } + }); + + if (resp.size() == 0) { + return null; + } + + RowEntity first = resp.getElements().stream().filter(e -> id.equals(e.getId())).findFirst().orElse(null); + return first == null ? null : buildProfileTask(first); + } + + private ProfileTask buildProfileTask(RowEntity data) { + return ProfileTask.builder() + .id(data.getId()) + .serviceId(data.getTagValue(ProfileTaskRecord.SERVICE_ID)) + .endpointName(data.getTagValue(ProfileTaskRecord.ENDPOINT_NAME)) + .startTime(((Number) data.getTagValue(ProfileTaskRecord.START_TIME)).longValue()) + .createTime(((Number) data.getTagValue(ProfileTaskRecord.CREATE_TIME)).longValue()) + .duration(((Number) data.getTagValue(ProfileTaskRecord.DURATION)).intValue()) + .minDurationThreshold(((Number) data.getTagValue(ProfileTaskRecord.MIN_DURATION_THRESHOLD)).intValue()) + .dumpPeriod(((Number) data.getTagValue(ProfileTaskRecord.DUMP_PERIOD)).intValue()) + .maxSamplingCount(((Number) data.getTagValue(ProfileTaskRecord.MAX_SAMPLING_COUNT)).intValue()) + .build(); + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java index fcf7228de3..68bd752139 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java @@ -77,7 +77,13 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i List<BasicTrace> basicTraces = new ArrayList<>(); for (String segmentID : segmentIds) { final StreamQueryResponse segmentRecordResp = query(SegmentRecord.INDEX_NAME, - ImmutableSet.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME), + ImmutableSet.of(SegmentRecord.TRACE_ID, + SegmentRecord.IS_ERROR, + SegmentRecord.SERVICE_ID, + SegmentRecord.SERVICE_INSTANCE_ID, + SegmentRecord.ENDPOINT_ID, + SegmentRecord.LATENCY, + SegmentRecord.START_TIME), new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery traceQuery) { @@ -89,7 +95,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i BasicTrace basicTrace = new BasicTrace(); basicTrace.setSegmentId(row.getId()); - basicTrace.setStart(String.valueOf(row.getTagValue(SegmentRecord.START_TIME))); + basicTrace.setStart(String.valueOf((Number) row.getTagValue(SegmentRecord.START_TIME))); basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId( row.getTagValue(SegmentRecord.ENDPOINT_ID) ).getEndpointName()); @@ -126,8 +132,11 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i @Override public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException { StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME, - ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, - ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE, + ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, + ProfileThreadSnapshotRecord.SEGMENT_ID, + ProfileThreadSnapshotRecord.DUMP_TIME, + ProfileThreadSnapshotRecord.SEQUENCE, + ProfileThreadSnapshotRecord.TIME_BUCKET, ProfileThreadSnapshotRecord.STACK_BINARY), new QueryBuilder<StreamQuery>() { @Override @@ -142,7 +151,6 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i for (final RowEntity rowEntity : resp.getElements()) { ProfileThreadSnapshotRecord record = this.builder.storage2Entity( new BanyanDBConverter.StorageToStream(ProfileThreadSnapshotRecord.INDEX_NAME, rowEntity)); - result.add(record); } return result; @@ -151,13 +159,19 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i @Override public SegmentRecord getProfiledSegment(String segmentId) throws IOException { StreamQueryResponse resp = query(SegmentRecord.INDEX_NAME, - ImmutableSet.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID, - SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, - SegmentRecord.START_TIME, SegmentRecord.DATA_BINARY), + ImmutableSet.of(SegmentRecord.TRACE_ID, + SegmentRecord.IS_ERROR, + SegmentRecord.SERVICE_ID, + SegmentRecord.SERVICE_INSTANCE_ID, + SegmentRecord.ENDPOINT_ID, + SegmentRecord.LATENCY, + SegmentRecord.START_TIME, + SegmentRecord.TIME_BUCKET, + SegmentRecord.DATA_BINARY), new QueryBuilder<StreamQuery>() { @Override public void apply(StreamQuery query) { - query.and(eq(SegmentRecord.INDEX_NAME, segmentId)); + query.and(eq(SegmentRecord.SEGMENT_ID, segmentId)); } }); @@ -166,27 +180,17 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i } final RowEntity rowEntity = resp.getElements().iterator().next(); - final SegmentRecord segmentRecord = new SegmentRecord(); - segmentRecord.setSegmentId(rowEntity.getTagValue(SegmentRecord.SEGMENT_ID)); - segmentRecord.setTraceId(rowEntity.getTagValue(SegmentRecord.TRACE_ID)); - segmentRecord.setServiceId(rowEntity.getTagValue(SegmentRecord.SERVICE_ID)); - segmentRecord.setStartTime( - ((Number) rowEntity.getTagValue(SegmentRecord.START_TIME)).longValue()); - segmentRecord.setLatency( - ((Number) rowEntity.getTagValue(SegmentRecord.LATENCY)).intValue()); - segmentRecord.setIsError( - ((Number) rowEntity.getTagValue(SegmentRecord.IS_ERROR)).intValue()); - byte[] dataBinary = rowEntity.getTagValue(SegmentRecord.DATA_BINARY); - if (dataBinary != null && dataBinary.length > 0) { - segmentRecord.setDataBinary(dataBinary); - } - return segmentRecord; + return new SegmentRecord.Builder().storage2Entity( + new BanyanDBConverter.StorageToStream(SegmentRecord.INDEX_NAME, rowEntity)); } private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) throws IOException { StreamQueryResponse resp = query(ProfileThreadSnapshotRecord.INDEX_NAME, - ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, - ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE, + ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, + ProfileThreadSnapshotRecord.SEGMENT_ID, + ProfileThreadSnapshotRecord.DUMP_TIME, + ProfileThreadSnapshotRecord.SEQUENCE, + ProfileThreadSnapshotRecord.TIME_BUCKET, ProfileThreadSnapshotRecord.STACK_BINARY), new QueryBuilder<StreamQuery>() { @Override
