This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-topn
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 438e8054ea37ce8c149d89ef8685af23d1b6d11e
Author: Megrez Lu <[email protected]>
AuthorDate: Sat Feb 25 21:28:21 2023 +0800

    registry TopN
---
 oap-server-bom/pom.xml                             |  2 +-
 .../plugin/banyandb/BanyanDBIndexInstaller.java    |  7 ++-
 .../plugin/banyandb/BanyanDBStorageClient.java     | 11 ++++
 .../storage/plugin/banyandb/MetadataRegistry.java  | 62 ++++++++++++++++++++--
 4 files changed, 77 insertions(+), 5 deletions(-)

diff --git a/oap-server-bom/pom.xml b/oap-server-bom/pom.xml
index 289353d600..6be20194f7 100644
--- a/oap-server-bom/pom.xml
+++ b/oap-server-bom/pom.xml
@@ -72,7 +72,7 @@
         <awaitility.version>3.0.0</awaitility.version>
         <httpcore.version>4.4.13</httpcore.version>
         <commons-compress.version>1.21</commons-compress.version>
-        <banyandb-java-client.version>0.3.0</banyandb-java-client.version>
+        
<banyandb-java-client.version>0.4.0-SNAPSHOT</banyandb-java-client.version>
         <kafka-clients.version>2.8.1</kafka-clients.version>
         <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
         <consul.client.version>1.5.3</consul.client.version>
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index b765dd4f63..e96aed80c3 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -88,12 +88,17 @@ public class BanyanDBIndexInstaller extends ModelInstaller {
             } else { // measure
                 Measure measure = 
MetadataRegistry.INSTANCE.registerMeasureModel(model, config, configService);
                 if (measure != null) {
-                    log.info("install measure schema {}", model.getName());
+                    log.info("install measure schema {}", measure.name());
                     ((BanyanDBStorageClient) client).define(measure);
+                    final BanyanDBClient c = ((BanyanDBStorageClient) 
this.client).client;
+                    
MetadataRegistry.INSTANCE.findMetadata(model).installTopNAggregation(c);
+                    log.info("installed TopN schema for measure {}", 
measure.name());
                 }
             }
         } catch (IOException ex) {
             throw new StorageException("fail to install schema", ex);
+        } catch (BanyanDBException ex) {
+            throw new StorageException("fail to install TopN schema", ex);
         }
     }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
index f2408f8695..73b527872d 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java
@@ -31,6 +31,7 @@ import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException
 import org.apache.skywalking.banyandb.v1.client.metadata.Measure;
 import org.apache.skywalking.banyandb.v1.client.metadata.Property;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
 import org.apache.skywalking.oap.server.library.client.Client;
 import 
org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
 import 
org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
@@ -146,6 +147,16 @@ public class BanyanDBStorageClient implements Client, 
HealthCheckable {
         }
     }
 
+    public void define(TopNAggregation topNAggregation) throws IOException {
+        try {
+            this.client.define(topNAggregation);
+            this.healthChecker.health();
+        } catch (BanyanDBException ex) {
+            healthChecker.unHealth(ex);
+            throw new IOException("fail to define TopNAggregation", ex);
+        }
+    }
+
     public void write(StreamWrite streamWrite) {
         this.client.write(streamWrite);
     }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 5756bd710e..ed8bc0b42d 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -20,6 +20,7 @@ package 
org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.gson.JsonObject;
 import io.grpc.Status;
 
@@ -36,6 +37,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import lombok.Builder;
 import lombok.Data;
@@ -47,6 +49,7 @@ import lombok.Setter;
 import lombok.Singular;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
 import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
@@ -59,11 +62,14 @@ import 
org.apache.skywalking.banyandb.v1.client.metadata.NamedSchema;
 import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
 import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
 import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
+import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregation;
+import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.config.ConfigService;
+import org.apache.skywalking.oap.server.core.query.enumeration.Order;
 import org.apache.skywalking.oap.server.core.query.enumeration.Step;
 import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
@@ -72,6 +78,7 @@ import 
org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetad
 import org.apache.skywalking.oap.server.core.storage.model.Model;
 import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
 import 
org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import org.apache.skywalking.oap.server.library.util.StringUtil;
 
 @Slf4j
@@ -132,7 +139,7 @@ public enum MetadataRegistry {
         // parse and set sharding keys
         List<String> shardingColumns = parseEntityNames(modelColumnMap);
         if (shardingColumns.isEmpty()) {
-           throw new StorageException("model " + model.getName() + " doesn't 
contain series id");
+            throw new StorageException("model " + model.getName() + " doesn't 
contain series id");
         }
         // parse tag metadata
         // this can be used to build both
@@ -167,10 +174,26 @@ public enum MetadataRegistry {
             builder.addField(field);
             schemaBuilder.field(field.getName());
         }
+        // parse TopN
+        schemaBuilder.topNSpec(parseTopNSpec(model, tagsAndFields));
+
         registry.put(schemaMetadata.name(), schemaBuilder.build());
         return builder.build();
     }
 
+    private TopNSpec parseTopNSpec(final Model model, final MeasureMetadata 
tagsAndFields) {
+        if (CollectionUtils.isEmpty(tagsAndFields.fields)) {
+            return null;
+        }
+        // TODO: how to configure parameters?
+        return TopNSpec.builder()
+                .lruSize(5)
+                .countersNumber(10)
+                .fieldName(tagsAndFields.fields.get(0).getName())
+                .sort(AbstractQuery.Sort.UNSPECIFIED) // include both TopN and 
BottomN
+                .build();
+    }
+
     public Schema findMetadata(final Model model) {
         if (model.isRecord()) {
             return findRecordMetadata(model.getName());
@@ -339,8 +362,6 @@ public enum MetadataRegistry {
      */
     MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder 
builder, List<String> shardingColumns) {
         // skip metric
-        Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = 
ValueColumnMetadata.INSTANCE
-                .readValueColumnDefinition(model.getName());
         MeasureMetadata.MeasureMetadataBuilder result = 
MeasureMetadata.builder();
         for (final ModelColumn col : model.getColumns()) {
             final String columnStorageName = 
col.getColumnName().getStorageName();
@@ -575,6 +596,9 @@ public enum MetadataRegistry {
             }
         }
 
+        /**
+         * @return name of the Stream/Measure in the BanyanDB
+         */
         public String name() {
             if (this.kind == Kind.MEASURE) {
                 return formatName(this.modelName, this.downSampling);
@@ -638,9 +662,41 @@ public enum MetadataRegistry {
         @Getter
         private final String timestampColumn4Stream;
 
+        @Getter
+        @Nullable
+        private final TopNSpec topNSpec;
+
         public ColumnSpec getSpec(String columnName) {
             return this.specs.get(columnName);
         }
+
+        public void installTopNAggregation(BanyanDBClient client) throws 
BanyanDBException {
+            if (this.topNSpec == null) {
+                if (this.metadata.kind == Kind.MEASURE) {
+                    log.debug("skip null TopN Schema for [{}]", 
metadata.getModelName());
+                }
+                return;
+            }
+            client.define(TopNAggregation.create(getMetadata().getGroup(), 
getMetadata().name() + "_topn")
+                    .setSourceMeasureName(getMetadata().name())
+                    .setFieldValueSort(topNSpec.sort)
+                    .setFieldName(topNSpec.fieldName)
+                    .setGroupByTagNames(topNSpec.groupByTagNames)
+                    .setCountersNumber(topNSpec.countersNumber)
+                    .setLruSize(topNSpec.lruSize)
+                    .build());
+        }
+    }
+
+    @Builder
+    @EqualsAndHashCode
+    public static class TopNSpec {
+        @Singular
+        private final List<String> groupByTagNames;
+        private final String fieldName;
+        private final AbstractQuery.Sort sort;
+        private final int lruSize;
+        private final int countersNumber;
     }
 
     @RequiredArgsConstructor

Reply via email to