This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 4d9083e727311789d9982f339d11bcddcf34e635 Author: Megrez Lu <[email protected]> AuthorDate: Tue Nov 30 19:47:54 2021 +0800 complete UITemplate, ProfileTaskRecord and ProfileThreasSnapshot query --- .../plugin/banyandb/BanyanDBStorageClient.java | 5 +- .../banyandb/converter/BasicTraceMapper.java | 12 +++ .../converter/DashboardConfigurationMapper.java | 23 ++++- .../banyandb/converter/ProfileTaskMapper.java | 28 ++++++ .../ProfileThreadSnapshotRecordMapper.java | 27 ++++++ .../plugin/banyandb/converter/RowEntityMapper.java | 6 ++ .../banyandb/converter/SegmentRecordMapper.java | 16 +++- .../stream/BanyanDBProfileTaskQueryDAO.java | 52 +++++++++- .../BanyanDBProfileThreadSnapshotQueryDAO.java | 105 +++++++++++++++++++-- .../plugin/banyandb/stream/BanyanDBRecordDAO.java | 6 +- .../banyandb/stream/BanyanDBTraceQueryDAO.java | 19 ++-- .../stream/BanyanDBUITemplateManagementDAO.java | 33 +++++-- 12 files changed, 297 insertions(+), 35 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java index 6ae741f971..3ad4daf918 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -4,6 +4,7 @@ import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.banyandb.v1.client.StreamWrite; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker; import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable; @@ -44,8 +45,8 @@ public class BanyanDBStorageClient implements Client, HealthCheckable { } } - public void write() { - this.client + public void write(StreamWrite streamWrite) { + this.client.write(streamWrite); } public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java index bb43a8080a..b3512158f2 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java @@ -1,10 +1,12 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; +import com.google.common.collect.ImmutableList; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.TagAndValue; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; +import java.util.Collections; import java.util.List; public class BasicTraceMapper implements RowEntityMapper<BasicTrace> { @@ -22,4 +24,14 @@ public class BasicTraceMapper implements RowEntityMapper<BasicTrace> { trace.setStart(String.valueOf(searchable.get(4).getValue())); return trace; } + + @Override + public List<String> searchableProjection() { + return ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"); + } + + @Override + public List<String> dataProjection() { + return Collections.emptyList(); + } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java index f4c4c19beb..3f7df9cb72 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java @@ -1,7 +1,10 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; +import com.google.common.collect.ImmutableList; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate; +import org.apache.skywalking.oap.server.core.query.enumeration.TemplateType; import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration; import org.apache.skywalking.oap.server.library.util.BooleanUtils; @@ -12,9 +15,27 @@ public class DashboardConfigurationMapper implements RowEntityMapper<DashboardCo public DashboardConfiguration map(RowEntity row) { DashboardConfiguration dashboardConfiguration = new DashboardConfiguration(); final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + // name dashboardConfiguration.setName((String) searchable.get(0).getValue()); + // disabled dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue())); - // TODO: convert back from data? + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + // activated + dashboardConfiguration.setActivated(BooleanUtils.valueToBoolean(((Number) data.get(0).getValue()).intValue())); + // configuration + dashboardConfiguration.setConfiguration((String) data.get(1).getValue()); + // type + dashboardConfiguration.setType(TemplateType.forName((String) data.get(2).getValue())); return dashboardConfiguration; } + + @Override + public List<String> searchableProjection() { + return ImmutableList.of(UITemplate.NAME, UITemplate.DISABLED); + } + + @Override + public List<String> dataProjection() { + return ImmutableList.of(UITemplate.ACTIVATED, UITemplate.CONFIGURATION, UITemplate.TYPE); + } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java new file mode 100644 index 0000000000..ea1ca09cb0 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileTaskMapper.java @@ -0,0 +1,28 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; + +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; +import org.apache.skywalking.oap.server.core.query.type.ProfileTask; + +import java.util.Collections; +import java.util.List; + +public class ProfileTaskMapper implements RowEntityMapper<ProfileTask> { + @Override + public List<String> searchableProjection() { + return ImmutableList.of(ProfileTaskRecord.SERVICE_ID, ProfileTaskRecord.ENDPOINT_NAME, ProfileTaskRecord.START_TIME, + ProfileTaskRecord.DURATION, ProfileTaskRecord.MIN_DURATION_THRESHOLD, ProfileTaskRecord.DUMP_PERIOD, + ProfileTaskRecord.CREATE_TIME, ProfileTaskRecord.MAX_SAMPLING_COUNT); + } + + @Override + public List<String> dataProjection() { + return Collections.emptyList(); + } + + @Override + public ProfileTask map(RowEntity row) { + return null; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java new file mode 100644 index 0000000000..9a25688132 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/ProfileThreadSnapshotRecordMapper.java @@ -0,0 +1,27 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; + +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; + +import java.util.Collections; +import java.util.List; + +public class ProfileThreadSnapshotRecordMapper implements RowEntityMapper<ProfileThreadSnapshotRecord> { + @Override + public List<String> searchableProjection() { + return ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, + ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE, + ProfileThreadSnapshotRecord.STACK_BINARY); + } + + @Override + public List<String> dataProjection() { + return Collections.emptyList(); + } + + @Override + public ProfileThreadSnapshotRecord map(RowEntity row) { + return null; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java index eb9b5fadba..b2facb3373 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java @@ -2,6 +2,12 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; import org.apache.skywalking.banyandb.v1.client.RowEntity; +import java.util.List; + public interface RowEntityMapper<T> { + List<String> searchableProjection(); + + List<String> dataProjection(); + T map(RowEntity row); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java index 2b20227931..6ea579ff55 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java @@ -1,9 +1,12 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.TagAndValue; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; +import java.util.Collections; import java.util.List; public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> { @@ -20,8 +23,17 @@ public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> { record.setLatency(((Number) searchable.get(5).getValue()).intValue()); record.setStartTime(((Number) searchable.get(6).getValue()).longValue()); final List<TagAndValue<?>> data = row.getTagFamilies().get(1); - // TODO: support binary data in the client SDK - record.setDataBinary((byte[]) data.get(0).getValue()); + record.setDataBinary(((ByteString) data.get(0).getValue()).toByteArray()); return record; } + + @Override + public List<String> searchableProjection() { + return ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"); + } + + @Override + public List<String> dataProjection() { + return Collections.singletonList("data_binary"); + } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java index c99d267cbe..34af74eb93 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskQueryDAO.java @@ -1,23 +1,67 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; +import org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord; import org.apache.skywalking.oap.server.core.query.type.ProfileTask; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.library.util.StringUtil; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileTaskMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper; import java.io.IOException; -import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskRecord} is a stream */ -public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO { +public class BanyanDBProfileTaskQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileTaskQueryDAO { + private static final RowEntityMapper<ProfileTask> MAPPER = new ProfileTaskMapper(); + + public BanyanDBProfileTaskQueryDAO(BanyanDBStorageClient client) { + super(client); + } + @Override public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException { - return Collections.emptyList(); + final StreamQuery query = new StreamQuery(ProfileTaskRecord.INDEX_NAME, MAPPER.searchableProjection()); + query.setDataProjections(MAPPER.dataProjection()); + + if (StringUtil.isNotEmpty(serviceId)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.SERVICE_ID, serviceId)); + } + + if (StringUtil.isNotEmpty(endpointName)) { + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileTaskRecord.ENDPOINT_NAME, endpointName)); + } + + if (Objects.nonNull(startTimeBucket)) { + query.appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(startTimeBucket))); + } + + if (Objects.nonNull(endTimeBucket)) { + query.appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileTaskRecord.START_TIME, TimeBucket.getTimestamp(endTimeBucket))); + } + + // TODO: why delete? + + if (Objects.nonNull(limit)) { + query.setLimit(limit); + } + + StreamQueryResponse resp = getClient().query(query); + return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); } @Override public ProfileTask getById(String id) throws IOException { - return null; + // TODO: support id query + throw new UnsupportedOperationException("element id get is not supported"); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java index 8a27e885fe..456636f287 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java @@ -1,40 +1,133 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.ProfileThreadSnapshotRecordMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; /** * {@link ProfileThreadSnapshotRecord} is a stream */ -public class BanyanDBProfileThreadSnapshotQueryDAO implements IProfileThreadSnapshotQueryDAO { +public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements IProfileThreadSnapshotQueryDAO { + private static final RowEntityMapper<ProfileThreadSnapshotRecord> MAPPER = new ProfileThreadSnapshotRecordMapper(); + private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper(); + private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper(); + + public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client) { + super(client); + } + @Override public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException { - return Collections.emptyList(); + final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection()); + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.TASK_ID, taskId)) + .appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEQUENCE, 0L)); + StreamQueryResponse resp = getClient().query(query); + + final List<String> segmentIDs = new ArrayList<>(resp.size()); + resp.getElements().forEach(elem -> segmentIDs.add(MAPPER.map(elem).getSegmentId())); + if (segmentIDs.isEmpty()) { + return Collections.emptyList(); + } + + // TODO: support `IN` or `OR` logic operation in BanyanDB + List<BasicTrace> basicTraces = new LinkedList<>(); + for (String segmentID : segmentIDs) { + final StreamQuery traceQuery = new StreamQuery(SegmentRecord.INDEX_NAME, BASIC_TRACE_MAPPER.searchableProjection()); + traceQuery.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.SEGMENT_ID, segmentID)); + StreamQueryResponse traceResponse = getClient().query(traceQuery); + basicTraces.addAll(traceResponse.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList())); + } + + // TODO: Sort in DB with DESC + basicTraces = basicTraces.stream() + // comparing start_time + .sorted(Comparator.comparing((Function<BasicTrace, Long>) basicTrace -> Long.parseLong(basicTrace.getStart())) + // and sort in reverse order + .reversed()) + .collect(Collectors.toList()); + return basicTraces; } @Override public int queryMinSequence(String segmentId, long start, long end) throws IOException { - return 0; + return querySequenceWithAgg(AggType.MIN, segmentId, start, end); } @Override public int queryMaxSequence(String segmentId, long start, long end) throws IOException { - return 0; + return querySequenceWithAgg(AggType.MAX, segmentId, start, end); } @Override public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException { - return Collections.emptyList(); + final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection()); + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) + .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) maxSequence)) + .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.SEQUENCE, (long) minSequence)); + query.setDataProjections(MAPPER.dataProjection()); + StreamQueryResponse resp = getClient().query(query); + return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); } @Override public SegmentRecord getProfiledSegment(String segmentId) throws IOException { - return null; + final StreamQuery query = new StreamQuery(SegmentRecord.INDEX_NAME, SEGMENT_RECORD_MAPPER.searchableProjection()); + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", SegmentRecord.INDEX_NAME, segmentId)); + query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection()); + StreamQueryResponse resp = getClient().query(query); + return resp.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).findFirst().orElse(null); + } + + private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) { + final StreamQuery query = new StreamQuery(ProfileThreadSnapshotRecord.INDEX_NAME, MAPPER.searchableProjection()); + query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) + .appendCondition(PairQueryCondition.LongQueryCondition.le("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, end)) + .appendCondition(PairQueryCondition.LongQueryCondition.ge("searchable", ProfileThreadSnapshotRecord.DUMP_TIME, start)); + query.setDataProjections(MAPPER.dataProjection()); + + StreamQueryResponse resp = getClient().query(query); + List<ProfileThreadSnapshotRecord> records = resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); + + switch (aggType) { + case MIN: + int minValue = Integer.MAX_VALUE; + for (final ProfileThreadSnapshotRecord record : records) { + int sequence = record.getSequence(); + minValue = Math.min(minValue, sequence); + } + return minValue; + case MAX: + int maxValue = Integer.MIN_VALUE; + for (ProfileThreadSnapshotRecord record : records) { + int sequence = record.getSequence(); + maxValue = Math.max(maxValue, sequence); + } + return maxValue; + default: + throw new IllegalArgumentException("should not reach this line"); + } + } + + enum AggType { + MIN, MAX } } \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java index f59bc25f9e..54870d98c6 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java @@ -45,11 +45,11 @@ public class BanyanDBRecordDAO implements IRecordDAO { if (SegmentRecord.INDEX_NAME.equals(model.getName())) { SegmentRecord segmentRecord = (SegmentRecord) record; StreamWrite streamWrite = StreamWrite.builder() - .name(BanyanDBSchema.NAME) - .binary(segmentRecord.getDataBinary()) + .name(SegmentRecord.INDEX_NAME) + .dataTag(Tag.binaryField(segmentRecord.getDataBinary())) .timestamp(segmentRecord.getStartTime()) .elementId(segmentRecord.getSegmentId()) - .tags(buildFieldObjects(this.storageBuilder.entity2Storage(segmentRecord))) + .searchableTags(buildFieldObjects(this.storageBuilder.entity2Storage(segmentRecord))) .build(); return new BanyanDBStreamInsertRequest(streamWrite); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java index 0a3f0e53b9..a37cfbb54c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java @@ -19,9 +19,10 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import org.apache.skywalking.banyandb.v1.client.*; -import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.banyandb.v1.client.TimestampRange; import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; import org.apache.skywalking.oap.server.core.query.type.*; @@ -48,9 +49,6 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im private static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormat.forPattern("yyyyMMddHHmmss"); - private static final List<String> BASIC_QUERY_PROJ = ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"); - private static final List<String> TRACE_ID_QUERY_PROJ = ImmutableList.of("trace_id", "state", "service_id", "service_instance_id", "endpoint_id", "duration", "start_time"); - public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) { super(client); } @@ -59,9 +57,10 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im public TraceBrief queryBasicTraces(long startSecondTB, long endSecondTB, long minDuration, long maxDuration, String serviceId, String serviceInstanceId, String endpointId, String traceId, int limit, int from, TraceState traceState, QueryOrder queryOrder, List<Tag> tags) throws IOException { StreamQuery query; if (startSecondTB != 0 && endSecondTB != 0) { - query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB), parseMillisFromEndSecondTB(endSecondTB)), BASIC_QUERY_PROJ); + query = new StreamQuery(BanyanDBSchema.NAME, new TimestampRange(parseMillisFromStartSecondTB(startSecondTB), + parseMillisFromEndSecondTB(endSecondTB)), BASIC_TRACE_MAPPER.searchableProjection()); } else { - query = new StreamQuery(BanyanDBSchema.NAME, BASIC_QUERY_PROJ); + query = new StreamQuery(BanyanDBSchema.NAME, BASIC_TRACE_MAPPER.searchableProjection()); } if (minDuration != 0) { // duration >= minDuration @@ -126,9 +125,9 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im @Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException { - StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, TRACE_ID_QUERY_PROJ); + StreamQuery query = new StreamQuery(BanyanDBSchema.NAME, SEGMENT_RECORD_MAPPER.searchableProjection()); query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId)); - query.setDataBinary(true); + query.setDataProjections(SEGMENT_RECORD_MAPPER.dataProjection()); StreamQueryResponse response = this.getClient().query(query); return response.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).collect(Collectors.toList()); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java index 102540c7b5..a4875e65f1 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java @@ -1,9 +1,10 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import com.google.common.collect.ImmutableList; import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.banyandb.v1.client.StreamWrite; +import org.apache.skywalking.banyandb.v1.client.Tag; import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate; import org.apache.skywalking.oap.server.core.query.input.DashboardSetting; import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration; @@ -25,27 +26,45 @@ import java.util.stream.Collectors; public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorageClient> implements UITemplateManagementDAO { private static final RowEntityMapper<DashboardConfiguration> MAPPER = new DashboardConfigurationMapper(); + private static final long UI_TEMPLATE_TIMESTAMP = 1L; + public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) { super(client); } @Override public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException { - StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, ImmutableList.of( - UITemplate.NAME, - UITemplate.DISABLED - )); + StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, MAPPER.dataProjection()); query.setLimit(10000); if (!includingDisabled) { query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE)); } + query.setDataProjections(MAPPER.dataProjection()); StreamQueryResponse resp = this.getClient().query(query); return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); } @Override public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException { - // TODO: support single write + final UITemplate uiTemplate = setting.toEntity(); + + StreamWrite request = StreamWrite.builder() + .name(UITemplate.INDEX_NAME) + // searchable - name + .searchableTag(Tag.stringField(uiTemplate.getName())) + // searchable - disabled + .searchableTag(Tag.longField(uiTemplate.getDisabled())) + // data - type + .dataTag(Tag.stringField(uiTemplate.getType())) + // data - configuration + .dataTag(Tag.stringField(uiTemplate.getConfiguration())) + // data - activated + .dataTag(Tag.longField(uiTemplate.getActivated())) + .timestamp(UI_TEMPLATE_TIMESTAMP) + .elementId(uiTemplate.id()) + .build(); + getClient().write(request); + return TemplateChangeStatus.builder().status(true).build(); } @Override @@ -55,6 +74,6 @@ public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorage @Override public TemplateChangeStatus disableTemplate(String name) throws IOException { - return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build(); + return TemplateChangeStatus.builder().status(false).message("Can't disable the template").build(); } }
