This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit c0bbcf1f68bd98430cdc510a67bd239fa0c4f044 Author: Megrez Lu <[email protected]> AuthorDate: Mon Nov 29 19:32:46 2021 +0800 abstract row entity mapper --- .../plugin/banyandb/BanyanDBStorageClient.java | 4 +++ .../banyandb/converter/BasicTraceMapper.java | 25 +++++++++++++++ .../converter/DashboardConfigurationMapper.java | 20 ++++++++++++ .../plugin/banyandb/converter/RowEntityMapper.java | 7 ++++ .../banyandb/converter/SegmentRecordMapper.java | 27 ++++++++++++++++ .../banyandb/stream/BanyanDBTraceQueryDAO.java | 37 +++++----------------- .../stream/BanyanDBUITemplateManagementDAO.java | 35 +++++++++++++++++--- 7 files changed, 121 insertions(+), 34 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java index eed4386e57..6ae741f971 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -44,6 +44,10 @@ public class BanyanDBStorageClient implements Client, HealthCheckable { } } + public void write() { + this.client + } + public StreamBulkWriteProcessor createBulkProcessor(int maxBulkSize, int flushInterval, int concurrency) { return this.client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java new file mode 100644 index 0000000000..bb43a8080a --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/BasicTraceMapper.java @@ -0,0 +1,25 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; + +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.analysis.IDManager; +import org.apache.skywalking.oap.server.core.query.type.BasicTrace; + +import java.util.List; + +public class BasicTraceMapper implements RowEntityMapper<BasicTrace> { + @Override + public BasicTrace map(RowEntity row) { + BasicTrace trace = new BasicTrace(); + trace.setSegmentId(row.getId()); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + trace.getTraceIds().add((String) searchable.get(0).getValue()); + trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1); + trace.getEndpointNames().add(IDManager.EndpointID.analysisId( + (String) searchable.get(2).getValue() + ).getEndpointName()); + trace.setDuration(((Long) searchable.get(3).getValue()).intValue()); + trace.setStart(String.valueOf(searchable.get(4).getValue())); + return trace; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java new file mode 100644 index 0000000000..f4c4c19beb --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/DashboardConfigurationMapper.java @@ -0,0 +1,20 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; + +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration; +import org.apache.skywalking.oap.server.library.util.BooleanUtils; + +import java.util.List; + +public class DashboardConfigurationMapper implements RowEntityMapper<DashboardConfiguration> { + @Override + public DashboardConfiguration map(RowEntity row) { + DashboardConfiguration dashboardConfiguration = new DashboardConfiguration(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + dashboardConfiguration.setName((String) searchable.get(0).getValue()); + dashboardConfiguration.setDisabled(BooleanUtils.valueToBoolean(((Number) searchable.get(1).getValue()).intValue())); + // TODO: convert back from data? + return dashboardConfiguration; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java new file mode 100644 index 0000000000..eb9b5fadba --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/RowEntityMapper.java @@ -0,0 +1,7 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; + +import org.apache.skywalking.banyandb.v1.client.RowEntity; + +public interface RowEntityMapper<T> { + T map(RowEntity row); +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java new file mode 100644 index 0000000000..2b20227931 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/converter/SegmentRecordMapper.java @@ -0,0 +1,27 @@ +package org.apache.skywalking.oap.server.storage.plugin.banyandb.converter; + +import org.apache.skywalking.banyandb.v1.client.RowEntity; +import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; + +import java.util.List; + +public class SegmentRecordMapper implements RowEntityMapper<SegmentRecord> { + @Override + public SegmentRecord map(RowEntity row) { + SegmentRecord record = new SegmentRecord(); + final List<TagAndValue<?>> searchable = row.getTagFamilies().get(0); + record.setSegmentId(row.getId()); + record.setTraceId((String) searchable.get(0).getValue()); + record.setIsError(((Number) searchable.get(1).getValue()).intValue()); + record.setServiceId((String) searchable.get(2).getValue()); + record.setServiceInstanceId((String) searchable.get(3).getValue()); + record.setEndpointId((String) searchable.get(4).getValue()); + record.setLatency(((Number) searchable.get(5).getValue()).intValue()); + record.setStartTime(((Number) searchable.get(6).getValue()).longValue()); + final List<TagAndValue<?>> data = row.getTagFamilies().get(1); + // TODO: support binary data in the client SDK + record.setDataBinary((byte[]) data.get(0).getValue()); + return record; + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java index 9fc403afc6..0a3f0e53b9 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java @@ -30,6 +30,9 @@ import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBSchema; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.BasicTraceMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.SegmentRecordMapper; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -40,6 +43,9 @@ import java.util.List; import java.util.stream.Collectors; public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> implements ITraceQueryDAO { + private static final RowEntityMapper<SegmentRecord> SEGMENT_RECORD_MAPPER = new SegmentRecordMapper(); + private static final RowEntityMapper<BasicTrace> BASIC_TRACE_MAPPER = new BasicTraceMapper(); + private static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormat.forPattern("yyyyMMddHHmmss"); private static final List<String> BASIC_QUERY_PROJ = ImmutableList.of("trace_id", "state", "endpoint_id", "duration", "start_time"); @@ -114,19 +120,7 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im StreamQueryResponse response = this.getClient().query(query); TraceBrief brief = new TraceBrief(); brief.setTotal(response.size()); - brief.getTraces().addAll(response.getElements().stream().map(elem -> { - BasicTrace trace = new BasicTrace(); - trace.setSegmentId(elem.getId()); - final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0); - trace.getTraceIds().add((String) searchable.get(0).getValue()); - trace.setError(((Long) searchable.get(1).getValue()).intValue() == 1); - trace.getEndpointNames().add(IDManager.EndpointID.analysisId( - (String) searchable.get(2).getValue() - ).getEndpointName()); - trace.setDuration(((Long) searchable.get(3).getValue()).intValue()); - trace.setStart(String.valueOf(searchable.get(4).getValue())); - return trace; - }).collect(Collectors.toList())); + brief.getTraces().addAll(response.getElements().stream().map(BASIC_TRACE_MAPPER::map).collect(Collectors.toList())); return brief; } @@ -136,22 +130,7 @@ public class BanyanDBTraceQueryDAO extends AbstractDAO<BanyanDBStorageClient> im query.appendCondition(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id", traceId)); query.setDataBinary(true); StreamQueryResponse response = this.getClient().query(query); - return response.getElements().stream().map(elem -> { - SegmentRecord record = new SegmentRecord(); - final List<TagAndValue<?>> searchable = elem.getTagFamilies().get(0); - record.setSegmentId(elem.getId()); - record.setTraceId((String) searchable.get(0).getValue()); - record.setIsError(((Number) searchable.get(1).getValue()).intValue()); - record.setServiceId((String) searchable.get(2).getValue()); - record.setServiceInstanceId((String) searchable.get(3).getValue()); - record.setEndpointId((String) searchable.get(4).getValue()); - record.setLatency(((Number) searchable.get(5).getValue()).intValue()); - record.setStartTime(((Number) searchable.get(6).getValue()).longValue()); - final List<TagAndValue<?>> data = elem.getTagFamilies().get(1); - // TODO: support binary data in the client SDK - record.setDataBinary((byte[]) data.get(0).getValue()); - return record; - }).collect(Collectors.toList()); + return response.getElements().stream().map(SEGMENT_RECORD_MAPPER::map).collect(Collectors.toList()); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java index ad8fed926e..102540c7b5 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBUITemplateManagementDAO.java @@ -1,31 +1,56 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; +import com.google.common.collect.ImmutableList; +import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; +import org.apache.skywalking.banyandb.v1.client.StreamQuery; +import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; +import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate; import org.apache.skywalking.oap.server.core.query.input.DashboardSetting; import org.apache.skywalking.oap.server.core.query.type.DashboardConfiguration; import org.apache.skywalking.oap.server.core.query.type.TemplateChangeStatus; +import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO; +import org.apache.skywalking.oap.server.library.util.BooleanUtils; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.DashboardConfigurationMapper; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.converter.RowEntityMapper; import java.io.IOException; -import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** * {@link org.apache.skywalking.oap.server.core.management.ui.template.UITemplate} is a stream */ -public class BanyanDBUITemplateManagementDAO implements UITemplateManagementDAO { +public class BanyanDBUITemplateManagementDAO extends AbstractDAO<BanyanDBStorageClient> implements UITemplateManagementDAO { + private static final RowEntityMapper<DashboardConfiguration> MAPPER = new DashboardConfigurationMapper(); + + public BanyanDBUITemplateManagementDAO(BanyanDBStorageClient client) { + super(client); + } + @Override public List<DashboardConfiguration> getAllTemplates(Boolean includingDisabled) throws IOException { - return Collections.emptyList(); + StreamQuery query = new StreamQuery(UITemplate.INDEX_NAME, ImmutableList.of( + UITemplate.NAME, + UITemplate.DISABLED + )); + query.setLimit(10000); + if (!includingDisabled) { + query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", UITemplate.DISABLED, (long) BooleanUtils.FALSE)); + } + StreamQueryResponse resp = this.getClient().query(query); + return resp.getElements().stream().map(MAPPER::map).collect(Collectors.toList()); } @Override public TemplateChangeStatus addTemplate(DashboardSetting setting) throws IOException { - return TemplateChangeStatus.builder().status(false).message("Can't add a new template").build(); + // TODO: support single write } @Override public TemplateChangeStatus changeTemplate(DashboardSetting setting) throws IOException { - return TemplateChangeStatus.builder().status(false).message("Can't add/update the template").build(); + return TemplateChangeStatus.builder().status(false).message("Can't update the template").build(); } @Override
