This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-topn in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 438e8054ea37ce8c149d89ef8685af23d1b6d11e Author: Megrez Lu <[email protected]> AuthorDate: Sat Feb 25 21:28:21 2023 +0800 registry TopN --- oap-server-bom/pom.xml | 2 +- .../plugin/banyandb/BanyanDBIndexInstaller.java | 7 ++- .../plugin/banyandb/BanyanDBStorageClient.java | 11 ++++ .../storage/plugin/banyandb/MetadataRegistry.java | 62 ++++++++++++++++++++-- 4 files changed, 77 insertions(+), 5 deletions(-) diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml index 289353d600..6be20194f7 100644 --- a/oap-server-bom/pom.xml +++ b/oap-server-bom/pom.xml @@ -72,7 +72,7 @@ <awaitility.version>3.0.0</awaitility.version> <httpcore.version>4.4.13</httpcore.version> <commons-compress.version>1.21</commons-compress.version> - <banyandb-java-client.version>0.3.0</banyandb-java-client.version> + <banyandb-java-client.version>0.4.0-SNAPSHOT</banyandb-java-client.version> <kafka-clients.version>2.8.1</kafka-clients.version> <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version> <consul.client.version>1.5.3</consul.client.version> diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index b765dd4f63..e96aed80c3 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -88,12 +88,17 @@ public class BanyanDBIndexInstaller extends ModelInstaller { } else { // measure Measure measure = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService); if (measure != null) { - log.info("install measure schema {}", model.getName()); + log.info("install measure schema {}", measure.name()); ((BanyanDBStorageClient) client).define(measure); + final BanyanDBClient c = ((BanyanDBStorageClient) this.client).client; + MetadataRegistry.INSTANCE.findMetadata(model).installTopNAggregation(c); + log.info("installed TopN schema for measure {}", measure.name()); } } } catch (IOException ex) { throw new StorageException("fail to install schema", ex); + } catch (BanyanDBException ex) { + throw new StorageException("fail to install TopN schema", ex); } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java index f2408f8695..73b527872d 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -31,6 +31,7 @@ import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException import org.apache.skywalking.banyandb.v1.client.metadata.Measure; import org.apache.skywalking.banyandb.v1.client.metadata.Property; import org.apache.skywalking.banyandb.v1.client.metadata.Stream; +import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker; import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable; @@ -146,6 +147,16 @@ public class BanyanDBStorageClient implements Client, HealthCheckable { } } + public void define(TopNAggregation topNAggregation) throws IOException { + try { + this.client.define(topNAggregation); + this.healthChecker.health(); + } catch (BanyanDBException ex) { + healthChecker.unHealth(ex); + throw new IOException("fail to define TopNAggregation", ex); + } + } + public void write(StreamWrite streamWrite) { this.client.write(streamWrite); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 5756bd710e..ed8bc0b42d 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.gson.JsonObject; import io.grpc.Status; @@ -36,6 +37,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.Builder; import lombok.Data; @@ -47,6 +49,7 @@ import lombok.Setter; import lombok.Singular; import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.banyandb.v1.client.AbstractQuery; import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.metadata.Catalog; @@ -59,11 +62,14 @@ import org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema; import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist; import org.apache.skywalking.banyandb.v1.client.metadata.Stream; import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec; +import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation; +import org.apache.skywalking.library.elasticsearch.requests.search.Sort; import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.metrics.IntList; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.config.ConfigService; +import org.apache.skywalking.oap.server.core.query.enumeration.Order; import org.apache.skywalking.oap.server.core.query.enumeration.Step; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB; @@ -72,6 +78,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetad import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject; +import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.StringUtil; @Slf4j @@ -132,7 +139,7 @@ public enum MetadataRegistry { // parse and set sharding keys List<String> shardingColumns = parseEntityNames(modelColumnMap); if (shardingColumns.isEmpty()) { - throw new StorageException("model " + model.getName() + " doesn't contain series id"); + throw new StorageException("model " + model.getName() + " doesn't contain series id"); } // parse tag metadata // this can be used to build both @@ -167,10 +174,26 @@ public enum MetadataRegistry { builder.addField(field); schemaBuilder.field(field.getName()); } + // parse TopN + schemaBuilder.topNSpec(parseTopNSpec(model, tagsAndFields)); + registry.put(schemaMetadata.name(), schemaBuilder.build()); return builder.build(); } + private TopNSpec parseTopNSpec(final Model model, final MeasureMetadata tagsAndFields) { + if (CollectionUtils.isEmpty(tagsAndFields.fields)) { + return null; + } + // TODO: how to configure parameters? + return TopNSpec.builder() + .lruSize(5) + .countersNumber(10) + .fieldName(tagsAndFields.fields.get(0).getName()) + .sort(AbstractQuery.Sort.UNSPECIFIED) // include both TopN and BottomN + .build(); + } + public Schema findMetadata(final Model model) { if (model.isRecord()) { return findRecordMetadata(model.getName()); @@ -339,8 +362,6 @@ public enum MetadataRegistry { */ MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder builder, List<String> shardingColumns) { // skip metric - Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE - .readValueColumnDefinition(model.getName()); MeasureMetadata.MeasureMetadataBuilder result = MeasureMetadata.builder(); for (final ModelColumn col : model.getColumns()) { final String columnStorageName = col.getColumnName().getStorageName(); @@ -575,6 +596,9 @@ public enum MetadataRegistry { } } + /** + * @return name of the Stream/Measure in the BanyanDB + */ public String name() { if (this.kind == Kind.MEASURE) { return formatName(this.modelName, this.downSampling); @@ -638,9 +662,41 @@ public enum MetadataRegistry { @Getter private final String timestampColumn4Stream; + @Getter + @Nullable + private final TopNSpec topNSpec; + public ColumnSpec getSpec(String columnName) { return this.specs.get(columnName); } + + public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException { + if (this.topNSpec == null) { + if (this.metadata.kind == Kind.MEASURE) { + log.debug("skip null TopN Schema for [{}]", metadata.getModelName()); + } + return; + } + client.define(TopNAggregation.create(getMetadata().getGroup(), getMetadata().name() + "_topn") + .setSourceMeasureName(getMetadata().name()) + .setFieldValueSort(topNSpec.sort) + .setFieldName(topNSpec.fieldName) + .setGroupByTagNames(topNSpec.groupByTagNames) + .setCountersNumber(topNSpec.countersNumber) + .setLruSize(topNSpec.lruSize) + .build()); + } + } + + @Builder + @EqualsAndHashCode + public static class TopNSpec { + @Singular + private final List<String> groupByTagNames; + private final String fieldName; + private final AbstractQuery.Sort sort; + private final int lruSize; + private final int countersNumber; } @RequiredArgsConstructor
