This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 9dd5d467cf8459406e841f9f487642b316c19e9b Author: Megrez Lu <[email protected]> AuthorDate: Thu Apr 21 21:23:55 2022 +0800 make code compiled --- .../storage-banyandb-plugin/pom.xml | 2 +- .../storage/plugin/banyandb/BanyanDBConverter.java | 55 ++-- .../plugin/banyandb/BanyanDBIndexInstaller.java | 47 +-- .../plugin/banyandb/BanyanDBMetricsDAO.java | 1 - .../plugin/banyandb/BanyanDBStorageClient.java | 30 +- .../plugin/banyandb/BanyanDBStorageProvider.java | 21 +- .../storage/plugin/banyandb/MetadataRegistry.java | 319 +++++++++++++-------- .../storage/plugin/banyandb/StreamMetadata.java | 71 ----- .../banyandb/measure/BanyanDBMetadataQueryDAO.java | 7 +- .../measure/BanyanDBProfileTaskQueryDAO.java | 2 +- ...kQueryDAO.java => BanyanDBServiceLabelDAO.java} | 12 +- .../banyandb/stream/AbstractBanyanDBDAO.java | 30 +- .../banyandb/stream/BanyanDBAlarmQueryDAO.java | 13 +- .../plugin/banyandb/stream/BanyanDBBatchDAO.java | 1 - .../stream/BanyanDBBrowserLogQueryDAO.java | 16 +- .../BanyanDBEBPFProfilingDataDAO.java} | 15 +- .../BanyanDBEBPFProfilingScheduleQueryDAO.java} | 15 +- .../BanyanDBEBPFProfilingTaskDAO.java} | 16 +- .../banyandb/stream/BanyanDBLogQueryDAO.java | 32 +-- .../stream/BanyanDBProfileTaskLogQueryDAO.java | 28 +- .../BanyanDBProfileThreadSnapshotQueryDAO.java | 70 +++-- .../plugin/banyandb/stream/BanyanDBRecordDAO.java | 9 +- .../banyandb/stream/BanyanDBTraceQueryDAO.java | 27 +- 23 files changed, 396 insertions(+), 443 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml index e2c9ae7817..7546099e4b 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/pom.xml @@ -21,7 +21,7 @@ <parent> <artifactId>server-storage-plugin</artifactId> <groupId>org.apache.skywalking</groupId> - <version>9.0.0-SNAPSHOT</version> + <version>9.1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java index 565245ad55..41237f1f3b 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java @@ -19,11 +19,13 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.banyandb.model.v1.BanyandbModel; import org.apache.skywalking.banyandb.v1.client.RowEntity; -import org.apache.skywalking.banyandb.v1.client.SerializableTag; import org.apache.skywalking.banyandb.v1.client.StreamWrite; import org.apache.skywalking.banyandb.v1.client.TagAndValue; +import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; +import org.apache.skywalking.banyandb.v1.client.metadata.Serializable; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; @@ -35,16 +37,11 @@ import java.util.function.Function; public class BanyanDBConverter { @RequiredArgsConstructor public static class StreamToEntity implements Convert2Entity { - private final StreamMetadata metadata; private final RowEntity rowEntity; @Override public Object get(String fieldName) { - final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName); - if (metadata == null) { - return null; - } - return rowEntity.getValue(metadata.getTagFamilyName(), metadata.getTagSpec().getTagName()); + return rowEntity.getTagValue(fieldName); } @Override @@ -53,9 +50,9 @@ public class BanyanDBConverter { } } + @Slf4j @RequiredArgsConstructor public static class StreamToStorage implements Convert2Storage<StreamWrite> { - private final StreamMetadata metadata; private final StreamWrite streamWrite; @Override @@ -64,41 +61,28 @@ public class BanyanDBConverter { if (Record.TIME_BUCKET.equals(fieldName)) { return; } - final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName); - if (metadata == null) { - return; - } - switch (metadata.getTagFamilyName()) { - case StreamMetadata.TAG_FAMILY_DATA: - this.streamWrite.dataTag(metadata.getTagIndex(), buildTag(fieldValue)); - break; - case StreamMetadata.TAG_FAMILY_SEARCHABLE: - this.streamWrite.searchableTag(metadata.getTagIndex(), buildTag(fieldValue)); - break; - default: - throw new IllegalStateException("tag family is not supported"); + try { + this.streamWrite.tag(fieldName, buildTag(fieldValue)); + } catch (BanyanDBException ex) { + log.error("fail to add tag", ex); } } - private SerializableTag<BanyandbModel.TagValue> buildTag(Object value) { + private Serializable<BanyandbModel.TagValue> buildTag(Object value) { if (Integer.class.equals(value.getClass()) || Long.class.equals(value.getClass())) { - return TagAndValue.longField((long) value); + return TagAndValue.longTagValue((long) value); } else if (String.class.equals(value.getClass())) { - return TagAndValue.stringField((String) value); + return TagAndValue.stringTagValue((String) value); } throw new IllegalStateException(value.getClass() + " is not supported"); } @Override public void accept(String fieldName, byte[] fieldValue) { - final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName); - if (metadata == null) { - return; - } - if (StreamMetadata.TAG_FAMILY_SEARCHABLE.equals(metadata.getTagFamilyName())) { - this.streamWrite.searchableTag(metadata.getTagIndex(), TagAndValue.binaryField((fieldValue))); - } else { - throw new IllegalStateException("binary tag should not be store in the `data` family"); + try { + this.streamWrite.tag(fieldName, TagAndValue.binaryTagValue(fieldValue)); + } catch (BanyanDBException ex) { + log.error("fail to add tag", ex); } } @@ -120,19 +104,12 @@ public class BanyanDBConverter { @Override public Object get(String fieldName) { - final StreamMetadata.TagMetadata metadata = this.metadata.getTagDefinition().get(fieldName); - if (metadata == null) { - return null; - } // TODO: get an unmodifiable view of tag return null; } @Override public StreamWrite obtain() { - if (metadata.isUseIdAsEntity()) { - this.accept(StreamMetadata.ID, this.streamWrite.getElementID()); - } return this.streamWrite; } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index f4483b0a14..d894c9990a 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -19,9 +19,10 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.banyandb.v1.client.metadata.Catalog; -import org.apache.skywalking.banyandb.v1.client.metadata.Duration; +import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; +import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.metadata.Group; +import org.apache.skywalking.banyandb.v1.client.metadata.Stream; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.storage.StorageException; @@ -43,25 +44,39 @@ public class BanyanDBIndexInstaller extends ModelInstaller { @Override protected boolean isExists(Model model) throws StorageException { - // TODO: get from BanyanDB and make a diff? - return false; + final MetadataRegistry.PartialMetadata metadata = MetadataRegistry.INSTANCE.parseMetadata(model); + try { + final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client; + // first check group + Group g = metadata.getOrCreateGroup(c); + if (g == null) { + throw new StorageException("fail to create group " + metadata.getGroup()); + } + log.info("group {} created", g.name()); + // then check entity schema + return metadata.findRemoteSchema(c).isPresent(); + } catch (BanyanDBException ex) { + throw new StorageException("fail to check existence", ex); + } } @Override protected void createTable(Model model) throws StorageException { - if (model.isTimeSeries() && model.isRecord()) { // stream - StreamMetadata metaInfo = MetadataRegistry.INSTANCE.registerModel(model, this.configService); - if (metaInfo != null) { - log.info("install index {}", model.getName()); - ((BanyanDBStorageClient) client).define( - new Group(metaInfo.getGroup(), Catalog.STREAM, 2, 10, Duration.ofDays(7)) - ); - ((BanyanDBStorageClient) client).define(metaInfo); + try { + if (model.isTimeSeries() && model.isRecord()) { // stream + Stream stream = (Stream) MetadataRegistry.INSTANCE.registerModel(model, this.configService); + if (stream != null) { + log.info("install stream schema {}", model.getName()); + ((BanyanDBStorageClient) client).define(stream); + } + } else if (model.isTimeSeries() && !model.isRecord()) { // measure + // TODO: dynamically register Measure + log.info("skip measure index {}", model.getName()); + } else if (!model.isTimeSeries()) { // UITemplate + log.info("skip property index {}", model.getName()); } - } else if (model.isTimeSeries() && !model.isRecord()) { // measure - log.info("skip measure index {}", model.getName()); - } else if (!model.isTimeSeries()) { // UITemplate - log.info("skip property index {}", model.getName()); + } catch (BanyanDBException ex) { + throw new StorageException("fail to install schema", ex); } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java index af54dd350f..4e84979498 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBMetricsDAO.java @@ -2,7 +2,6 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import lombok.RequiredArgsConstructor; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; -import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java index b1ca206110..dd92c6fd08 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -23,8 +23,7 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.banyandb.v1.client.StreamWrite; -import org.apache.skywalking.banyandb.v1.client.metadata.Group; -import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule; +import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.metadata.Stream; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker; @@ -32,25 +31,17 @@ import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckab import org.apache.skywalking.oap.server.library.util.HealthChecker; import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient}, * which implement {@link Client} and {@link HealthCheckable}. */ public class BanyanDBStorageClient implements Client, HealthCheckable { - private final BanyanDBClient client; - private final Map<String, Group> groupMap; + final BanyanDBClient client; private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker(); public BanyanDBStorageClient(String host, int port) { this.client = new BanyanDBClient(host, port); - this.groupMap = new ConcurrentHashMap<>(); - } - - public Group define(Group group) { - return groupMap.computeIfAbsent(group.getName(), s -> client.define(group)); } @Override @@ -63,22 +54,19 @@ public class BanyanDBStorageClient implements Client, HealthCheckable { this.client.close(); } - public StreamQueryResponse query(StreamQuery streamQuery) { + public StreamQueryResponse query(StreamQuery streamQuery) throws BanyanDBException { try { - StreamQueryResponse response = this.client.queryStreams(streamQuery); + StreamQueryResponse response = this.client.query(streamQuery); this.healthChecker.health(); return response; - } catch (Throwable t) { - healthChecker.unHealth(t); - throw t; + } catch (BanyanDBException ex) { + healthChecker.unHealth(ex); + throw ex; } } - public void define(StreamMetadata streamMetadata) { - Stream stream = this.client.define(streamMetadata.getStream()); - if (stream != null) { - this.client.defineIndexRules(stream, streamMetadata.getIndexRules().toArray(new IndexRule[]{})); - } + public void define(Stream stream) throws BanyanDBException { + this.client.define(stream); } public void write(StreamWrite streamWrite) { diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java index 922bf2296a..88a81d3372 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java @@ -28,9 +28,13 @@ import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO; import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO; import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO; import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO; @@ -50,11 +54,15 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMetadataQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBNetworkAddressAliasDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBServiceLabelDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBTopologyQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBUITemplateManagementDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBAlarmQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBatchDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBBrowserLogQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingDataDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingScheduleQueryDAO; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBEBPFProfilingTaskDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBHistoryDeleteDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBLogQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskLogQueryDAO; @@ -106,13 +114,16 @@ public class BanyanDBStorageProvider extends ModuleProvider { this.registerServiceImplementation(ILogQueryDAO.class, new BanyanDBLogQueryDAO(client)); this.registerServiceImplementation(IProfileTaskQueryDAO.class, new BanyanDBProfileTaskQueryDAO()); this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new BanyanDBProfileTaskLogQueryDAO(client, this.config.getFetchTaskLogMaxSize())); - this.registerServiceImplementation( - IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client)); + this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new BanyanDBProfileThreadSnapshotQueryDAO(client)); this.registerServiceImplementation(UITemplateManagementDAO.class, new BanyanDBUITemplateManagementDAO()); this.registerServiceImplementation(IEventQueryDAO.class, new BanyanDBEventQueryDAO()); this.registerServiceImplementation(ITopologyQueryDAO.class, new BanyanDBTopologyQueryDAO()); + this.registerServiceImplementation(IEBPFProfilingTaskDAO.class, new BanyanDBEBPFProfilingTaskDAO()); + this.registerServiceImplementation(IEBPFProfilingDataDAO.class, new BanyanDBEBPFProfilingDataDAO()); + this.registerServiceImplementation(IEBPFProfilingScheduleDAO.class, new BanyanDBEBPFProfilingScheduleQueryDAO()); // TODO: metrics + this.registerServiceImplementation(IServiceLabelDAO.class, new BanyanDBServiceLabelDAO()); this.registerServiceImplementation(IHistoryDeleteDAO.class, new BanyanDBHistoryDeleteDAO()); this.registerServiceImplementation(IMetricsQueryDAO.class, new BanyanDBMetricsQueryDAO()); this.registerServiceImplementation(IAggregationQueryDAO.class, new BanyanDBAggregationQueryDAO()); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 83af6da867..b8925c7529 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -18,10 +18,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; -import com.google.common.collect.ImmutableSet; -import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; -import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase; +import io.grpc.Status; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; +import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; +import org.apache.skywalking.banyandb.v1.client.metadata.Catalog; +import org.apache.skywalking.banyandb.v1.client.metadata.Duration; +import org.apache.skywalking.banyandb.v1.client.metadata.Group; import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule; +import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema; import org.apache.skywalking.banyandb.v1.client.metadata.Stream; import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec; import org.apache.skywalking.oap.server.core.alarm.AlarmRecord; @@ -33,6 +40,7 @@ import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; import org.apache.skywalking.oap.server.library.util.StringUtil; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -40,173 +48,230 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.stream.Collectors; +@Slf4j public enum MetadataRegistry { INSTANCE; - private final Map<String, StreamMetadata> streams = new HashMap<>(); + private final Map<String, PartialMetadata> registry = new ConcurrentHashMap<>(); - public StreamMetadata registerModel(Model model, ConfigService configService) { - BanyandbDatabase.Stream pbStream = parseStreamFromModel(model, configService); + public NamedSchema<?> registerModel(Model model, ConfigService configService) { + PartialMetadata partialMetadata = parseMetadata(model); + final Stream.Builder builder = Stream.create(partialMetadata.getGroup(), partialMetadata.getName()); + Map<String, ModelColumn> modelColumnMap = model.getColumns().stream() + .collect(Collectors.toMap(modelColumn -> modelColumn.getColumnName().getStorageName(), Function.identity())); + // parse and set sharding keys + builder.setEntityRelativeTags(parseEntityNames(modelColumnMap)); + // parse and set tag families, which contains tag specs + List<TagFamilySpec> specs = parseTagFamilySpecs(model, partialMetadata, configService); + builder.addTagFamilies(specs); + // parse and add index definition + builder.addIndexes(parseIndexRules(specs, partialMetadata.indexFamily(), modelColumnMap)); - final boolean useIdAsEntity = pbStream.getEntity().getTagNamesCount() == 1 && - StreamMetadata.ID.equals(pbStream.getEntity().getTagNames(0)); + registry.put(model.getName(), partialMetadata); + return builder.build(); + } - final Stream stream = new Stream(pbStream.getMetadata().getGroup(), pbStream.getMetadata().getName()); + public PartialMetadata findSchema(final String name) { + return this.registry.get(name); + } + List<IndexRule> parseIndexRules(List<TagFamilySpec> specs, String indexTagFamily, Map<String, ModelColumn> modelColumnMap) { List<IndexRule> indexRules = new ArrayList<>(); - Set<String> entityNameSet = ImmutableSet.copyOf(pbStream.getEntity().getTagNamesList()); - stream.setEntityTagNames(pbStream.getEntity().getTagNamesList()); - - Map<String, StreamMetadata.TagMetadata> tagDefinition = new HashMap<>(); - - for (BanyandbDatabase.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) { - final TagFamilySpec tagFamilySpec = TagFamilySpec.fromProtobuf(pbTagFamilySpec); - stream.addTagFamilySpec(tagFamilySpec); - - int tagIndex = 0; - for (final TagFamilySpec.TagSpec tagSpec : tagFamilySpec.getTagSpecs()) { - // register tag - tagDefinition.put(tagSpec.getTagName(), new StreamMetadata.TagMetadata(tagFamilySpec.getTagFamilyName(), tagSpec, tagIndex++)); - - // if the tag family equals to "searchable", build index rules - if (tagFamilySpec.getTagFamilyName().equals(StreamMetadata.TAG_FAMILY_SEARCHABLE)) { - // check if this spec exists in the entity names - if (entityNameSet.contains(tagSpec.getTagName())) { - continue; - } - BanyandbDatabase.IndexRule pbIndexRule = parseIndexRuleFromTagSpec(pbStream.getMetadata(), tagSpec); - IndexRule indexRule = IndexRule.fromProtobuf(pbIndexRule); - indexRules.add(indexRule); + for (final TagFamilySpec spec : specs) { + if (!indexTagFamily.equals(spec.tagFamilyName())) { + continue; + } + for (final TagFamilySpec.TagSpec tagSpec : spec.tagSpecs()) { + final String tagName = tagSpec.getTagName(); + // TODO: we need to add support index type in the OAP core + // Currently, we only register INVERTED type + final ModelColumn modelColumn = modelColumnMap.get(tagName); + // if it is null, it must be a user-defined tag + if (modelColumn == null) { + indexRules.add(IndexRule.create(tagName, IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.SERIES)); + continue; + } + if (modelColumn.getBanyanDBExtension().isGlobalIndexing()) { + indexRules.add(IndexRule.create(tagName, IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.GLOBAL)); + } else { + indexRules.add(IndexRule.create(tagName, IndexRule.IndexType.INVERTED, IndexRule.IndexLocation.SERIES)); } } } - - StreamMetadata streamMetadata = StreamMetadata.builder().model(model).stream(stream) - .tagDefinition(tagDefinition) - .indexRules(indexRules) - .group(pbStream.getMetadata().getGroup()) - .useIdAsEntity(useIdAsEntity) - .build(); - streams.put(model.getName(), streamMetadata); - return streamMetadata; - } - - public StreamMetadata findStreamMetadata(final String name) { - return this.streams.get(name); + return indexRules; } - private BanyandbDatabase.Stream parseStreamFromModel(Model model, ConfigService configService) { + /** + * Parse sharding keys from the {@link Model} + * + * @param modelColumnMap the mapping between column storageName and {@link ModelColumn} + * @return a list of column names in strict order + */ + List<String> parseEntityNames(Map<String, ModelColumn> modelColumnMap) { List<ModelColumn> shardingColumns = new ArrayList<>(); + for (final ModelColumn col : modelColumnMap.values()) { + if (col.getBanyanDBExtension().isShardingKey()) { + shardingColumns.add(col); + } + } + return shardingColumns.stream() + .sorted(Comparator.comparingInt(col -> col.getBanyanDBExtension().getShardingKeyIdx())) + .map(col -> col.getColumnName().getName()) + .collect(Collectors.toList()); + } - List<BanyandbDatabase.TagSpec> searchableTagsSpecs = new ArrayList<>(); - List<BanyandbDatabase.TagSpec> dataTagsSpecs = new ArrayList<>(); - for (final ModelColumn modelColumn : model.getColumns()) { - if (modelColumn.getShardingKeyIdx() > -1) { - shardingColumns.add(modelColumn); + List<TagFamilySpec> parseTagFamilySpecs(Model model, PartialMetadata metadata, ConfigService configService) { + Map<String, TagFamilySpec.Builder> builderMap = new HashMap<>(); + for (final ModelColumn col : model.getColumns()) { + final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col); + if (tagSpec == null) { + continue; } - if (modelColumn.isIndexOnly()) { - // skip - } else if (modelColumn.isStorageOnly()) { - dataTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn)); + if (col.shouldIndex()) { + builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpec(tagSpec); } else { - searchableTagsSpecs.add(parseTagSpecFromModelColumn(modelColumn)); + builderMap.computeIfAbsent(metadata.nonIndexFamily(), TagFamilySpec::create).addTagSpec(tagSpec); } } - Set<String> entities = shardingColumns.stream() - .sorted(Comparator.comparingInt(ModelColumn::getShardingKeyIdx)) - .map(modelColumn -> modelColumn.getColumnName().getStorageName()) - .collect(Collectors.toSet()); - - if (entities.isEmpty()) { - // if sharding keys are not defined, we have to use ID - entities = Collections.singleton(StreamMetadata.ID); - // append ID - searchableTagsSpecs.add(BanyandbDatabase.TagSpec.newBuilder() - .setName(StreamMetadata.ID) - .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build()); - } - // add all user-defined indexed tags to the end of the "searchable" family if (SegmentRecord.INDEX_NAME.equals(model.getName())) { - searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableTracesTags())); + builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpecs(parseExtraTagSpecs(configService.getSearchableTracesTags())); } else if (LogRecord.INDEX_NAME.equals(model.getName())) { - searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableLogsTags())); + builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpecs(parseExtraTagSpecs(configService.getSearchableLogsTags())); } else if (AlarmRecord.INDEX_NAME.equals(model.getName())) { - searchableTagsSpecs.addAll(parseTagSpecsFromConfiguration(configService.getSearchableAlarmTags())); + builderMap.computeIfAbsent(metadata.indexFamily(), TagFamilySpec::create).addTagSpecs(parseExtraTagSpecs(configService.getSearchableAlarmTags())); } - String group = "default-stream"; - if (model.isSuperDataset()) { - // for superDataset, we should use separate group - group = model.getName() + "-stream"; - } + return builderMap.values().stream().map(TagFamilySpec.Builder::build).collect(Collectors.toList()); + } - return BanyandbDatabase.Stream.newBuilder() - .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder() - .setName(StreamMetadata.TAG_FAMILY_DATA) - .addAllTags(dataTagsSpecs) - .build()) - .addTagFamilies(BanyandbDatabase.TagFamilySpec.newBuilder() - .setName(StreamMetadata.TAG_FAMILY_SEARCHABLE) - .addAllTags(searchableTagsSpecs) - .build()) - .setEntity(BanyandbDatabase.Entity.newBuilder() - .addAllTagNames(entities) - .build()) - .setMetadata(BanyandbCommon.Metadata.newBuilder() - .setGroup(group) - .setName(model.getName()) - .build()) - .build(); + /** + * Extract extra tags from Configuration. + * They are for tags defined for {@link SegmentRecord}, {@link LogRecord} and {@link AlarmRecord}. + * + * @param tags a series of tags joint by comma + * @return a list of {@link org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec.TagSpec} generated from input + */ + private List<TagFamilySpec.TagSpec> parseExtraTagSpecs(String tags) { + if (StringUtil.isEmpty(tags)) { + return Collections.emptyList(); + } + String[] tagsArray = tags.split(","); + if (tagsArray.length == 0) { + return Collections.emptyList(); + } + return Arrays.stream(tagsArray) + .map(TagFamilySpec.TagSpec::newStringTag) + .collect(Collectors.toList()); } - private BanyandbDatabase.TagSpec parseTagSpecFromModelColumn(ModelColumn modelColumn) { + /** + * Parse TagSpec from {@link ModelColumn} + * + * @param modelColumn the column in the model to be parsed + * @return a typed tag spec + */ + @Nullable + private TagFamilySpec.TagSpec parseTagSpec(ModelColumn modelColumn) { final Class<?> clazz = modelColumn.getType(); if (String.class.equals(clazz)) { - return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName()) - .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build(); + return TagFamilySpec.TagSpec.newStringTag(modelColumn.getColumnName().getStorageName()); } else if (int.class.equals(clazz) || long.class.equals(clazz)) { - return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName()) - .setType(BanyandbDatabase.TagType.TAG_TYPE_INT).build(); + return TagFamilySpec.TagSpec.newIntTag(modelColumn.getColumnName().getStorageName()); } else if (byte[].class.equals(clazz) || DataTable.class.equals(clazz)) { - return BanyandbDatabase.TagSpec.newBuilder().setName(modelColumn.getColumnName().getStorageName()) - .setType(BanyandbDatabase.TagType.TAG_TYPE_DATA_BINARY).build(); + return TagFamilySpec.TagSpec.newBinaryTag(modelColumn.getColumnName().getStorageName()); } else { + // TODO: we skip all tags with type of List<String> + if ("tags".equals(modelColumn.getColumnName().getStorageName())) { + return null; + } throw new IllegalStateException("type " + modelColumn.getType().toString() + " is not supported"); } } - private List<BanyandbDatabase.TagSpec> parseTagSpecsFromConfiguration(String tags) { - if (StringUtil.isEmpty(tags)) { - return Collections.emptyList(); + public PartialMetadata parseMetadata(Model model) { + if (model.isRecord()) { + String group = "stream-default"; + if (model.isSuperDataset()) { + // for superDataset, we should use separate group + group = "stream-" + model.getName(); + } + return new PartialMetadata(group, model.getName(), Kind.STREAM); } - String[] tagsArray = tags.split(","); - if (tagsArray.length == 0) { - return Collections.emptyList(); + return new PartialMetadata("measure-default", model.getName(), Kind.MEASURE); + } + + @RequiredArgsConstructor + @Data + public static class PartialMetadata { + private final String group; + private final String name; + private final Kind kind; + + public Optional<NamedSchema<?>> findRemoteSchema(BanyanDBClient client) throws BanyanDBException { + try { + switch (kind) { + case STREAM: + return Optional.ofNullable(client.findStream(this.group, this.name)); + case MEASURE: + return Optional.ofNullable(client.findMeasure(this.group, this.name)); + default: + throw new IllegalStateException("should not reach here"); + } + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.NOT_FOUND)) { + return Optional.empty(); + } + + throw ex; + } + } + + public Group getOrCreateGroup(BanyanDBClient client) throws BanyanDBException { + Group g = client.findGroup(this.group); + if (g != null) { + return g; + } + switch (kind) { + case STREAM: + return client.define(Group.create(this.group, Catalog.STREAM, 2, 0, Duration.ofDays(7))); + case MEASURE: + return client.define(Group.create(this.group, Catalog.MEASURE, 2, 12, Duration.ofDays(7))); + default: + throw new IllegalStateException("should not reach here"); + } + } + + public String indexFamily() { + switch (kind) { + case MEASURE: + return "default"; + case STREAM: + return "searchable"; + default: + throw new IllegalStateException("should not reach here"); + } + } + + public String nonIndexFamily() { + switch (kind) { + case MEASURE: + return null; + case STREAM: + return "binary"; + default: + throw new IllegalStateException("should not reach here"); + } } - return Arrays.stream(tagsArray) - .map(tagName -> BanyandbDatabase.TagSpec.newBuilder().setName(tagName) - .setType(BanyandbDatabase.TagType.TAG_TYPE_STRING).build()) - .collect(Collectors.toList()); } - private BanyandbDatabase.IndexRule parseIndexRuleFromTagSpec(BanyandbCommon.Metadata metadata, TagFamilySpec.TagSpec tagSpec) { - // In SkyWalking, only "trace_id" should be stored as a global index - BanyandbDatabase.IndexRule.Location loc = "trace_id".equals(tagSpec.getTagName()) ? - BanyandbDatabase.IndexRule.Location.LOCATION_GLOBAL : - BanyandbDatabase.IndexRule.Location.LOCATION_SERIES; - - return BanyandbDatabase.IndexRule.newBuilder() - .setMetadata(BanyandbCommon.Metadata.newBuilder() - .setName(tagSpec.getTagName()).setGroup(metadata.getGroup())) - .setLocation(loc) - .addTags(tagSpec.getTagName()) - // TODO: support TYPE_TREE - .setType(BanyandbDatabase.IndexRule.Type.TYPE_INVERTED) - .build(); + public enum Kind { + MEASURE, STREAM; } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java deleted file mode 100644 index 814f5f62e2..0000000000 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetadata.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.storage.plugin.banyandb; - -import lombok.Builder; -import lombok.Data; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule; -import org.apache.skywalking.banyandb.v1.client.metadata.Stream; -import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec; -import org.apache.skywalking.oap.server.core.storage.model.Model; - -import java.util.List; -import java.util.Map; - -@Getter -@Builder -@Slf4j -public class StreamMetadata { - public static final String TAG_FAMILY_SEARCHABLE = "searchable"; - public static final String TAG_FAMILY_DATA = "data"; - - public static final String ID = "id"; - - private final Model model; - - private final Map<String, TagMetadata> tagDefinition; - - /** - * Group of the stream - */ - private final String group; - /** - * Spec of the stream - */ - private final Stream stream; - /** - * Index rules attached to the stream - */ - private final List<IndexRule> indexRules; - - private final int dataFamilySize; - private final int searchableFamilySize; - - private final boolean useIdAsEntity; - - @Getter - @Data - public static class TagMetadata { - private final String tagFamilyName; - private final TagFamilySpec.TagSpec tagSpec; - private final int tagIndex; - } -} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java index 5833c98c56..bff4fc00be 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java @@ -55,10 +55,15 @@ public class BanyanDBMetadataQueryDAO implements IMetadataQueryDAO { } @Override - public List<Process> listProcesses(String serviceId, String instanceId) throws IOException { + public List<Process> listProcesses(String serviceId, String instanceId, String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { return Collections.emptyList(); } + @Override + public long getProcessesCount(String serviceId, String instanceId, String agentId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { + return 0; + } + @Override public Process getProcess(String processId) throws IOException { return null; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java index 0c6e1ebfd1..690bc6640d 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java @@ -19,7 +19,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; import org.apache.skywalking.oap.server.core.query.type.ProfileTask; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO; import java.io.IOException; import java.util.Collections; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java similarity index 67% copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java index 0c6e1ebfd1..295af6f75a 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBServiceLabelDAO.java @@ -18,21 +18,15 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; -import org.apache.skywalking.oap.server.core.query.type.ProfileTask; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO; import java.io.IOException; import java.util.Collections; import java.util.List; -public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO { +public class BanyanDBServiceLabelDAO implements IServiceLabelDAO { @Override - public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException { + public List<String> queryAllLabels(String serviceId) throws IOException { return Collections.emptyList(); } - - @Override - public ProfileTask getById(String id) throws IOException { - return null; - } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java index 0f093e396b..c40ac35924 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java @@ -22,12 +22,14 @@ import org.apache.skywalking.banyandb.v1.client.PairQueryCondition; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.banyandb.v1.client.TimestampRange; +import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.oap.server.core.storage.AbstractDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; +import java.io.IOException; import java.time.Instant; -import java.util.List; +import java.util.Set; public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageClient> { private static final Instant UPPER_BOUND = Instant.ofEpochSecond(0, Long.MAX_VALUE); @@ -38,41 +40,45 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli super(client); } - protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, QueryBuilder builder) { + protected StreamQueryResponse query(MetadataRegistry.PartialMetadata metadata, Set<String> searchableTags, QueryBuilder builder) throws IOException { return this.query(metadata, searchableTags, null, builder); } - protected StreamQueryResponse query(StreamMetadata metadata, List<String> searchableTags, TimestampRange timestampRange, - QueryBuilder builder) { + protected StreamQueryResponse query(MetadataRegistry.PartialMetadata metadata, Set<String> searchableTags, TimestampRange timestampRange, + QueryBuilder builder) throws IOException { final StreamQuery query; if (timestampRange == null) { - query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), LARGEST_TIME_RANGE, searchableTags); + query = new StreamQuery(metadata.getGroup(), metadata.getName(), LARGEST_TIME_RANGE, searchableTags); } else { - query = new StreamQuery(metadata.getGroup(), metadata.getModel().getName(), timestampRange, searchableTags); + query = new StreamQuery(metadata.getGroup(), metadata.getName(), timestampRange, searchableTags); } builder.apply(query); - return getClient().query(query); + try { + return getClient().query(query); + } catch (BanyanDBException ex) { + throw new IOException(ex); + } } protected abstract static class QueryBuilder { abstract void apply(final StreamQuery query); protected PairQueryCondition<Long> eq(String name, long value) { - return PairQueryCondition.LongQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value); + return PairQueryCondition.LongQueryCondition.eq(name, value); } protected PairQueryCondition<Long> lte(String name, long value) { - return PairQueryCondition.LongQueryCondition.le(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value); + return PairQueryCondition.LongQueryCondition.le(name, value); } protected PairQueryCondition<Long> gte(String name, long value) { - return PairQueryCondition.LongQueryCondition.ge(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value); + return PairQueryCondition.LongQueryCondition.ge(name, value); } protected PairQueryCondition<String> eq(String name, String value) { - return PairQueryCondition.StringQueryCondition.eq(StreamMetadata.TAG_FAMILY_SEARCHABLE, name, value); + return PairQueryCondition.StringQueryCondition.eq(name, value); } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java index 0b7693d16c..b2a43abff7 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java @@ -18,7 +18,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; @@ -34,7 +34,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata; import java.io.IOException; import java.util.List; @@ -45,8 +44,8 @@ import java.util.Objects; * which can be used to build a {@link org.apache.skywalking.oap.server.core.query.type.AlarmMessage} */ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarmQueryDAO { - private final StreamMetadata alarmRecordMetadata = - MetadataRegistry.INSTANCE.findStreamMetadata(AlarmRecord.INDEX_NAME); + private final MetadataRegistry.PartialMetadata alarmRecordMetadata = + MetadataRegistry.INSTANCE.findSchema(AlarmRecord.INDEX_NAME); public BanyanDBAlarmQueryDAO(BanyanDBStorageClient client) { super(client); @@ -60,13 +59,11 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm } StreamQueryResponse resp = query(alarmRecordMetadata, - ImmutableList.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME), + ImmutableSet.of(AlarmRecord.SCOPE, AlarmRecord.START_TIME, AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA), tsRange, new QueryBuilder() { @Override public void apply(StreamQuery query) { - query.setDataProjections(ImmutableList.of(AlarmRecord.ID0, AlarmRecord.ID1, AlarmRecord.ALARM_MESSAGE, AlarmRecord.TAGS_RAW_DATA)); - if (Objects.nonNull(scopeId)) { query.appendCondition(eq(AlarmRecord.SCOPE, (long) scopeId)); } @@ -90,7 +87,7 @@ public class BanyanDBAlarmQueryDAO extends AbstractBanyanDBDAO implements IAlarm for (final RowEntity rowEntity : resp.getElements()) { AlarmRecord.Builder builder = new AlarmRecord.Builder(); AlarmRecord alarmRecord = builder.storage2Entity( - new BanyanDBConverter.StreamToEntity(this.alarmRecordMetadata, rowEntity) + new BanyanDBConverter.StreamToEntity(rowEntity) ); AlarmMessage message = new AlarmMessage(); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java index 040b031828..e17b8b66ac 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBatchDAO.java @@ -31,7 +31,6 @@ import java.util.concurrent.CompletableFuture; public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO { private StreamBulkWriteProcessor bulkProcessor; - private final int maxBulkSize; private final int flushInterval; private final int concurrency; diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java index d384210cd9..2c412cf7dc 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java @@ -18,7 +18,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.StreamQuery; @@ -34,18 +34,16 @@ import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata; import java.io.IOException; -import java.util.Collections; import java.util.Objects; /** * {@link org.apache.skywalking.oap.server.core.browser.manual.errorlog.BrowserErrorLogRecord} is a stream */ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements IBrowserLogQueryDAO { - private final StreamMetadata browserErrorLogRecordMetadata = - MetadataRegistry.INSTANCE.findStreamMetadata(BrowserErrorLogRecord.INDEX_NAME); + private final MetadataRegistry.PartialMetadata browserErrorLogRecordMetadata = + MetadataRegistry.INSTANCE.findSchema(BrowserErrorLogRecord.INDEX_NAME); public BanyanDBBrowserLogQueryDAO(BanyanDBStorageClient client) { super(client); @@ -58,13 +56,12 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I tsRange = new TimestampRange(TimeBucket.getTimestamp(startSecondTB), TimeBucket.getTimestamp(endSecondTB)); } - StreamQueryResponse resp = query(browserErrorLogRecordMetadata, ImmutableList.of(BrowserErrorLogRecord.SERVICE_ID, + StreamQueryResponse resp = query(browserErrorLogRecordMetadata, ImmutableSet.of(BrowserErrorLogRecord.SERVICE_ID, BrowserErrorLogRecord.SERVICE_VERSION_ID, BrowserErrorLogRecord.PAGE_PATH_ID, - BrowserErrorLogRecord.ERROR_CATEGORY), tsRange, new QueryBuilder() { + BrowserErrorLogRecord.ERROR_CATEGORY, BrowserErrorLogRecord.DATA_BINARY), tsRange, new QueryBuilder() { @Override public void apply(StreamQuery query) { - query.setDataProjections(Collections.singletonList(BrowserErrorLogRecord.DATA_BINARY)); query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_ID, serviceId)); if (StringUtil.isNotEmpty(serviceVersionId)) { @@ -88,8 +85,7 @@ public class BanyanDBBrowserLogQueryDAO extends AbstractBanyanDBDAO implements I logs.setTotal(resp.size()); for (final RowEntity rowEntity : resp.getElements()) { - final byte[] dataBinary = - rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, BrowserErrorLogRecord.DATA_BINARY); + final byte[] dataBinary = rowEntity.getTagValue(BrowserErrorLogRecord.DATA_BINARY); if (dataBinary != null && dataBinary.length > 0) { BrowserErrorLog log = parserDataBinary(dataBinary); logs.getLogs().add(log); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingDataDAO.java similarity index 67% copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingDataDAO.java index 0c6e1ebfd1..4036fa771c 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingDataDAO.java @@ -16,23 +16,18 @@ * */ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import org.apache.skywalking.oap.server.core.query.type.ProfileTask; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord; +import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO; import java.io.IOException; import java.util.Collections; import java.util.List; -public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO { +public class BanyanDBEBPFProfilingDataDAO implements IEBPFProfilingDataDAO { @Override - public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException { + public List<EBPFProfilingDataRecord> queryData(List<String> scheduleIdList, long beginTime, long endTime) throws IOException { return Collections.emptyList(); } - - @Override - public ProfileTask getById(String id) throws IOException { - return null; - } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingScheduleQueryDAO.java similarity index 67% copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingScheduleQueryDAO.java index 0c6e1ebfd1..e6e4d529e4 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingScheduleQueryDAO.java @@ -16,23 +16,18 @@ * */ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import org.apache.skywalking.oap.server.core.query.type.ProfileTask; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingSchedule; +import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO; import java.io.IOException; import java.util.Collections; import java.util.List; -public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO { +public class BanyanDBEBPFProfilingScheduleQueryDAO implements IEBPFProfilingScheduleDAO { @Override - public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException { + public List<EBPFProfilingSchedule> querySchedules(String taskId, long startTimeBucket, long endTimeBucket) throws IOException { return Collections.emptyList(); } - - @Override - public ProfileTask getById(String id) throws IOException { - return null; - } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingTaskDAO.java similarity index 66% copy from oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java copy to oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingTaskDAO.java index 0c6e1ebfd1..b0a59f3251 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBProfileTaskQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBEBPFProfilingTaskDAO.java @@ -16,23 +16,19 @@ * */ -package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; +package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import org.apache.skywalking.oap.server.core.query.type.ProfileTask; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; +import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType; +import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask; +import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO; import java.io.IOException; import java.util.Collections; import java.util.List; -public class BanyanDBProfileTaskQueryDAO implements IProfileTaskQueryDAO { +public class BanyanDBEBPFProfilingTaskDAO implements IEBPFProfilingTaskDAO { @Override - public List<ProfileTask> getTaskList(String serviceId, String endpointName, Long startTimeBucket, Long endTimeBucket, Integer limit) throws IOException { + public List<EBPFProfilingTask> queryTasks(List<String> serviceIdList, EBPFProfilingTargetType targetType, long taskStartTime, long latestUpdateTime) throws IOException { return Collections.emptyList(); } - - @Override - public ProfileTask getById(String id) throws IOException { - return null; - } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java index 5cef08a726..c6097539c9 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java @@ -18,7 +18,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.skywalking.apm.network.logging.v3.LogTags; import org.apache.skywalking.banyandb.v1.client.RowEntity; @@ -41,7 +41,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.StringUtil; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata; import java.io.IOException; import java.util.List; @@ -51,8 +50,8 @@ import java.util.Objects; * {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream */ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQueryDAO { - private final StreamMetadata logRecordMetadata = - MetadataRegistry.INSTANCE.findStreamMetadata(LogRecord.INDEX_NAME); + private final MetadataRegistry.PartialMetadata logRecordMetadata = + MetadataRegistry.INSTANCE.findSchema(LogRecord.INDEX_NAME); public BanyanDBLogQueryDAO(BanyanDBStorageClient client) { super(client); @@ -66,8 +65,6 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer final QueryBuilder query = new QueryBuilder() { @Override public void apply(StreamQuery query) { - query.setDataProjections(ImmutableList.of(AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, AbstractLogRecord.TAGS_RAW_DATA)); - if (StringUtil.isNotEmpty(serviceId)) { query.appendCondition(eq(AbstractLogRecord.SERVICE_ID, serviceId)); } @@ -105,32 +102,31 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQuer } StreamQueryResponse resp = query(logRecordMetadata, - ImmutableList.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID, + ImmutableSet.of(AbstractLogRecord.SERVICE_ID, AbstractLogRecord.SERVICE_INSTANCE_ID, AbstractLogRecord.ENDPOINT_ID, AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID, - AbstractLogRecord.SPAN_ID), tsRange, query); + AbstractLogRecord.SPAN_ID, AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT, + AbstractLogRecord.TAGS_RAW_DATA), tsRange, query); Logs logs = new Logs(); logs.setTotal(resp.size()); for (final RowEntity rowEntity : resp.getElements()) { Log log = new Log(); - log.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_ID)); + log.setServiceId(rowEntity.getTagValue(AbstractLogRecord.SERVICE_ID)); log.setServiceInstanceId( - rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.SERVICE_INSTANCE_ID)); + rowEntity.getTagValue(AbstractLogRecord.SERVICE_INSTANCE_ID)); log.setEndpointId( - rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.ENDPOINT_ID)); + rowEntity.getTagValue(AbstractLogRecord.ENDPOINT_ID)); if (log.getEndpointId() != null) { log.setEndpointName( IDManager.EndpointID.analysisId(log.getEndpointId()).getEndpointName()); } - log.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.TRACE_ID)); - log.setTimestamp(((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, - AbstractLogRecord.TIMESTAMP)).longValue()); + log.setTraceId(rowEntity.getTagValue(AbstractLogRecord.TRACE_ID)); + log.setTimestamp(((Number) rowEntity.getTagValue(AbstractLogRecord.TIMESTAMP)).longValue()); log.setContentType(ContentType.instanceOf( - ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, - AbstractLogRecord.CONTENT_TYPE)).intValue())); - log.setContent(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, AbstractLogRecord.CONTENT)); - byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, AbstractLogRecord.TAGS_RAW_DATA); + ((Number) rowEntity.getTagValue(AbstractLogRecord.CONTENT_TYPE)).intValue())); + log.setContent(rowEntity.getTagValue(AbstractLogRecord.CONTENT)); + byte[] dataBinary = rowEntity.getTagValue(AbstractLogRecord.TAGS_RAW_DATA); if (dataBinary != null && dataBinary.length > 0) { parserDataBinary(dataBinary, log.getTags()); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java index 4411957f69..6749118a25 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileTaskLogQueryDAO.java @@ -18,28 +18,27 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; -import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord; +import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskLogRecord; import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLog; import org.apache.skywalking.oap.server.core.query.type.ProfileTaskLogOperationType; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata; import java.io.IOException; import java.util.LinkedList; import java.util.List; /** - * {@link org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord} is a stream + * {@link ProfileTaskLogRecord} is a stream */ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implements IProfileTaskLogQueryDAO { - private final StreamMetadata profileTaskLogRecord = - MetadataRegistry.INSTANCE.findStreamMetadata(ProfileTaskLogRecord.INDEX_NAME); + private final MetadataRegistry.PartialMetadata profileTaskLogRecord = + MetadataRegistry.INSTANCE.findSchema(ProfileTaskLogRecord.INDEX_NAME); private final int queryMaxSize; @@ -51,12 +50,11 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen @Override public List<ProfileTaskLog> getTaskLogList() throws IOException { StreamQueryResponse resp = query(profileTaskLogRecord, - ImmutableList.of(ProfileTaskLogRecord.OPERATION_TIME, ProfileTaskLogRecord.INSTANCE_ID), + ImmutableSet.of(ProfileTaskLogRecord.OPERATION_TIME, ProfileTaskLogRecord.INSTANCE_ID, + ProfileTaskLogRecord.TASK_ID, ProfileTaskLogRecord.OPERATION_TYPE), new QueryBuilder() { @Override public void apply(StreamQuery query) { - query.setDataProjections(ImmutableList.of(ProfileTaskLogRecord.TASK_ID, - ProfileTaskLogRecord.OPERATION_TYPE)); query.setLimit(BanyanDBProfileTaskLogQueryDAO.this.queryMaxSize); } }); @@ -72,15 +70,13 @@ public class BanyanDBProfileTaskLogQueryDAO extends AbstractBanyanDBDAO implemen private ProfileTaskLog parseTaskLog(RowEntity data) { return ProfileTaskLog.builder() .id(data.getId()) - .taskId(data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.TASK_ID)) + .taskId(data.getTagValue(ProfileTaskLogRecord.TASK_ID)) .instanceId( - data.getValue(StreamMetadata.TAG_FAMILY_DATA, ProfileTaskLogRecord.INSTANCE_ID)) + data.getTagValue(ProfileTaskLogRecord.INSTANCE_ID)) .operationType(ProfileTaskLogOperationType.parse( - ((Number) data.getValue(StreamMetadata.TAG_FAMILY_DATA, - ProfileTaskLogRecord.OPERATION_TYPE)).intValue())) + ((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TYPE)).intValue())) .operationTime( - ((Number) data.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, - ProfileTaskLogRecord.OPERATION_TIME)).longValue()) + ((Number) data.getTagValue(ProfileTaskLogRecord.OPERATION_TIME)).longValue()) .build(); } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java index 4c6ac7ddcd..a0cacd0c39 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java @@ -18,20 +18,19 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord; -import org.apache.skywalking.oap.server.core.profile.ProfileThreadSnapshotRecord; +import org.apache.skywalking.oap.server.core.profiling.trace.ProfileThreadSnapshotRecord; import org.apache.skywalking.oap.server.core.query.type.BasicTrace; -import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; +import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO; import org.apache.skywalking.oap.server.library.util.BooleanUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata; import java.io.IOException; import java.util.ArrayList; @@ -49,11 +48,11 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i protected final ProfileThreadSnapshotRecord.Builder builder = new ProfileThreadSnapshotRecord.Builder(); - private final StreamMetadata profileThreadSnapshotMetadata = - MetadataRegistry.INSTANCE.findStreamMetadata(ProfileThreadSnapshotRecord.INDEX_NAME); + private final MetadataRegistry.PartialMetadata profileThreadSnapshotMetadata = + MetadataRegistry.INSTANCE.findSchema(ProfileThreadSnapshotRecord.INDEX_NAME); - private final StreamMetadata segmentRecordMetadata = - MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME); + private final MetadataRegistry.PartialMetadata segmentRecordMetadata = + MetadataRegistry.INSTANCE.findSchema(SegmentRecord.INDEX_NAME); public BanyanDBProfileThreadSnapshotQueryDAO(BanyanDBStorageClient client) { super(client); @@ -62,7 +61,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i @Override public List<BasicTrace> queryProfiledSegments(String taskId) throws IOException { StreamQueryResponse resp = query(profileThreadSnapshotMetadata, - ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, + ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE), new QueryBuilder() { @Override @@ -78,14 +77,14 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i final List<String> segmentIds = new LinkedList<>(); for (final RowEntity rowEntity : resp.getElements()) { - segmentIds.add(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, ProfileThreadSnapshotRecord.SEGMENT_ID)); + segmentIds.add(rowEntity.getTagValue(ProfileThreadSnapshotRecord.SEGMENT_ID)); } // TODO: support `IN` or `OR` logic operation in BanyanDB List<BasicTrace> basicTraces = new ArrayList<>(); for (String segmentID : segmentIds) { final StreamQueryResponse segmentRecordResp = query(segmentRecordMetadata, - ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME), + ImmutableSet.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME), new QueryBuilder() { @Override public void apply(StreamQuery traceQuery) { @@ -97,15 +96,15 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i BasicTrace basicTrace = new BasicTrace(); basicTrace.setSegmentId(row.getId()); - basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME))); + basicTrace.setStart(String.valueOf(row.getTagValue(SegmentRecord.START_TIME))); basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId( - row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID) + row.getTagValue(SegmentRecord.ENDPOINT_ID) ).getEndpointName()); - basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue()); + basicTrace.setDuration(((Number) row.getTagValue(SegmentRecord.LATENCY)).intValue()); basicTrace.setError(BooleanUtils.valueToBoolean( - ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue() + ((Number) row.getTagValue(SegmentRecord.IS_ERROR)).intValue() )); - basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID)); + basicTrace.getTraceIds().add(row.getTagValue(SegmentRecord.TRACE_ID)); basicTraces.add(basicTrace); } @@ -134,13 +133,12 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i @Override public List<ProfileThreadSnapshotRecord> queryRecords(String segmentId, int minSequence, int maxSequence) throws IOException { StreamQueryResponse resp = query(profileThreadSnapshotMetadata, - ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, - ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE), + ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, + ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE, + ProfileThreadSnapshotRecord.STACK_BINARY), new QueryBuilder() { @Override public void apply(StreamQuery query) { - query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY)); - query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) .appendCondition(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence)) .appendCondition(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence)); @@ -150,7 +148,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i List<ProfileThreadSnapshotRecord> result = new ArrayList<>(maxSequence - minSequence); for (final RowEntity rowEntity : resp.getElements()) { ProfileThreadSnapshotRecord record = this.builder.storage2Entity( - new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity)); + new BanyanDBConverter.StreamToEntity(rowEntity)); result.add(record); } @@ -160,11 +158,12 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i @Override public SegmentRecord getProfiledSegment(String segmentId) throws IOException { StreamQueryResponse resp = query(segmentRecordMetadata, - ImmutableList.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID, SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, SegmentRecord.START_TIME), + ImmutableSet.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID, + SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, + SegmentRecord.START_TIME, SegmentRecord.DATA_BINARY), new QueryBuilder() { @Override public void apply(StreamQuery query) { - query.setDataProjections(Collections.singletonList(SegmentRecord.DATA_BINARY)); query.appendCondition(eq(SegmentRecord.INDEX_NAME, segmentId)); } }); @@ -175,31 +174,30 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i final RowEntity rowEntity = resp.getElements().iterator().next(); final SegmentRecord segmentRecord = new SegmentRecord(); - segmentRecord.setSegmentId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SEGMENT_ID)); - segmentRecord.setTraceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID)); - segmentRecord.setServiceId(rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.SERVICE_ID)); + segmentRecord.setSegmentId(rowEntity.getTagValue(SegmentRecord.SEGMENT_ID)); + segmentRecord.setTraceId(rowEntity.getTagValue(SegmentRecord.TRACE_ID)); + segmentRecord.setServiceId(rowEntity.getTagValue(SegmentRecord.SERVICE_ID)); segmentRecord.setStartTime( - ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME)).longValue()); + ((Number) rowEntity.getTagValue(SegmentRecord.START_TIME)).longValue()); segmentRecord.setLatency( - ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue()); + ((Number) rowEntity.getTagValue(SegmentRecord.LATENCY)).intValue()); segmentRecord.setIsError( - ((Number) rowEntity.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue()); - byte[] dataBinary = rowEntity.getValue(StreamMetadata.TAG_FAMILY_DATA, SegmentRecord.DATA_BINARY); + ((Number) rowEntity.getTagValue(SegmentRecord.IS_ERROR)).intValue()); + byte[] dataBinary = rowEntity.getTagValue(SegmentRecord.DATA_BINARY); if (dataBinary != null && dataBinary.length > 0) { segmentRecord.setDataBinary(dataBinary); } return segmentRecord; } - private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) { + private int querySequenceWithAgg(AggType aggType, String segmentId, long start, long end) throws IOException { StreamQueryResponse resp = query(profileThreadSnapshotMetadata, - ImmutableList.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, - ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE), + ImmutableSet.of(ProfileThreadSnapshotRecord.TASK_ID, ProfileThreadSnapshotRecord.SEGMENT_ID, + ProfileThreadSnapshotRecord.DUMP_TIME, ProfileThreadSnapshotRecord.SEQUENCE, + ProfileThreadSnapshotRecord.STACK_BINARY), new QueryBuilder() { @Override public void apply(StreamQuery query) { - query.setDataProjections(Collections.singletonList(ProfileThreadSnapshotRecord.STACK_BINARY)); - query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId)) .appendCondition(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end)) .appendCondition(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start)); @@ -209,7 +207,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends AbstractBanyanDBDAO i List<ProfileThreadSnapshotRecord> records = new ArrayList<>(); for (final RowEntity rowEntity : resp.getElements()) { ProfileThreadSnapshotRecord record = this.builder.storage2Entity( - new BanyanDBConverter.StreamToEntity(profileThreadSnapshotMetadata, rowEntity)); + new BanyanDBConverter.StreamToEntity(rowEntity)); records.add(record); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java index a1cf73edcd..3fe0e03ef7 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java @@ -29,7 +29,6 @@ import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata; import java.io.IOException; @@ -39,17 +38,15 @@ public class BanyanDBRecordDAO implements IRecordDAO { @Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { - StreamMetadata metadata = MetadataRegistry.INSTANCE.findStreamMetadata(model.getName()); + MetadataRegistry.PartialMetadata metadata = MetadataRegistry.INSTANCE.findSchema(model.getName()); if (metadata == null) { throw new IOException(model.getName() + " is not registered"); } StreamWrite streamWrite = new StreamWrite(metadata.getGroup(), // group name model.getName(), // index-name record.id(), // identity - TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling()), // timestamp - metadata.getDataFamilySize(), // length of the "data" tag family - metadata.getSearchableFamilySize()); // length of the "searchable" tag family - Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(metadata, streamWrite); + TimeBucket.getTimestamp(record.getTimeBucket(), model.getDownsampling())); // timestamp + Convert2Storage<StreamWrite> convert2Storage = new BanyanDBConverter.StreamToStorage(streamWrite); storageBuilder.entity2Storage(record, convert2Storage); return new BanyanDBStreamInsertRequest(convert2Storage.obtain()); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java index fc0d458c29..e149b5ab27 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java @@ -19,7 +19,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.skywalking.banyandb.v1.client.RowEntity; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; @@ -39,7 +39,6 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter; import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient; import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.StreamMetadata; import java.io.IOException; import java.util.ArrayList; @@ -47,8 +46,8 @@ import java.util.Collections; import java.util.List; public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITraceQueryDAO { - private final StreamMetadata segmentRecordMetadata = - MetadataRegistry.INSTANCE.findStreamMetadata(SegmentRecord.INDEX_NAME); + private final MetadataRegistry.PartialMetadata segmentRecordMetadata = + MetadataRegistry.INSTANCE.findSchema(SegmentRecord.INDEX_NAME); public BanyanDBTraceQueryDAO(BanyanDBStorageClient client) { super(client); @@ -117,7 +116,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace } StreamQueryResponse resp = query(segmentRecordMetadata, - ImmutableList.of(SegmentRecord.TRACE_ID, // 0 - trace_id + ImmutableSet.of(SegmentRecord.TRACE_ID, // 0 - trace_id SegmentRecord.IS_ERROR, // 1 - is_error SegmentRecord.SERVICE_ID, // 2 - service_id SegmentRecord.SERVICE_INSTANCE_ID, // 3 - service_instance_id @@ -133,15 +132,15 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace BasicTrace basicTrace = new BasicTrace(); basicTrace.setSegmentId(row.getId()); - basicTrace.setStart(String.valueOf(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.START_TIME))); + basicTrace.setStart(String.valueOf(row.getTagValue(SegmentRecord.START_TIME))); basicTrace.getEndpointNames().add(IDManager.EndpointID.analysisId( - row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.ENDPOINT_ID) + row.getTagValue(SegmentRecord.ENDPOINT_ID) ).getEndpointName()); - basicTrace.setDuration(((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.LATENCY)).intValue()); + basicTrace.setDuration(((Number) row.getTagValue(SegmentRecord.LATENCY)).intValue()); basicTrace.setError(BooleanUtils.valueToBoolean( - ((Number) row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.IS_ERROR)).intValue() + ((Number) row.getTagValue(SegmentRecord.IS_ERROR)).intValue() )); - basicTrace.getTraceIds().add(row.getValue(StreamMetadata.TAG_FAMILY_SEARCHABLE, SegmentRecord.TRACE_ID)); + basicTrace.getTraceIds().add(row.getTagValue(SegmentRecord.TRACE_ID)); traceBrief.getTraces().add(basicTrace); } @@ -152,17 +151,17 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace @Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException { StreamQueryResponse resp = query(segmentRecordMetadata, - ImmutableList.of(SegmentRecord.TRACE_ID, + ImmutableSet.of(SegmentRecord.TRACE_ID, SegmentRecord.IS_ERROR, SegmentRecord.SERVICE_ID, SegmentRecord.SERVICE_INSTANCE_ID, SegmentRecord.ENDPOINT_ID, SegmentRecord.LATENCY, - SegmentRecord.START_TIME), + SegmentRecord.START_TIME, + SegmentRecord.DATA_BINARY), new QueryBuilder() { @Override public void apply(StreamQuery query) { - query.setDataProjections(Collections.singletonList(SegmentRecord.DATA_BINARY)); query.appendCondition(eq(SegmentRecord.TRACE_ID, traceId)); } }); @@ -171,7 +170,7 @@ public class BanyanDBTraceQueryDAO extends AbstractBanyanDBDAO implements ITrace for (final RowEntity rowEntity : resp.getElements()) { SegmentRecord segmentRecord = new SegmentRecord.Builder().storage2Entity( - new BanyanDBConverter.StreamToEntity(segmentRecordMetadata, rowEntity)); + new BanyanDBConverter.StreamToEntity(rowEntity)); segmentRecords.add(segmentRecord); }
