This is an automated email from the ASF dual-hosted git repository.

wankai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 47ce2720b9 BanyanDB: Support custom `TopN pre-aggregation` rules 
configuration in file `bydb-topn.yml`. (#13319)
47ce2720b9 is described below

commit 47ce2720b9be6af391138c2b84c4ec63c454a3b3
Author: Wan Kai <wankai...@foxmail.com>
AuthorDate: Tue Jun 17 12:49:11 2025 +0800

    BanyanDB: Support custom `TopN pre-aggregation` rules configuration in file 
`bydb-topn.yml`. (#13319)
---
 apm-dist/src/main/assembly/binary.xml              |  1 +
 docs/en/changes/changes.md                         |  3 +-
 docs/en/setup/backend/storages/banyandb.md         | 82 +++++++++++++++++++++
 .../skywalking/oal/rt/parser/SourceColumn.java     | 15 +++-
 .../oal/rt/parser/SourceColumnsFactory.java        |  2 +-
 .../skywalking/oal/rt/util/OALClassGenerator.java  |  8 +-
 .../core/browser/source/BrowserAppPagePerf.java    |  1 -
 .../core/browser/source/BrowserAppPageTraffic.java |  1 +
 .../browser/source/BrowserAppResourcePerf.java     |  1 -
 .../source/BrowserAppWebInteractionPerf.java       |  1 -
 .../browser/source/BrowserAppWebVitalsPerf.java    |  1 -
 .../oap/server/core/source/DefaultScopeDefine.java |  8 +-
 .../oap/server/core/source/Endpoint.java           |  2 +-
 .../oap/server/core/source/K8SEndpoint.java        |  2 +-
 .../oap/server/core/source/ScopeDefaultColumn.java | 20 +++--
 .../server/core/storage/annotation/BanyanDB.java   | 26 -------
 .../core/storage/model/BanyanDBModelExtension.java | 39 +---------
 .../server/core/storage/model/StorageModels.java   | 10 ---
 .../src/main/resources/bydb-topn.yml               | 82 +++++++++++++++++++++
 .../banyandb/BanyanDBAggregationQueryDAO.java      | 67 +++++++++++++----
 .../plugin/banyandb/BanyanDBConfigLoader.java      | 68 ++++++++++++++++-
 .../plugin/banyandb/BanyanDBIndexInstaller.java    | 54 +++++---------
 .../plugin/banyandb/BanyanDBStorageConfig.java     | 56 ++++++++++++++
 .../storage/plugin/banyandb/MetadataRegistry.java  | 86 ++++++++++++++--------
 .../banyandb/stream/AbstractBanyanDBDAO.java       | 15 ++--
 .../server/storage/plugin/banyandb/BanyanDBIT.java |  2 +-
 26 files changed, 462 insertions(+), 191 deletions(-)

diff --git a/apm-dist/src/main/assembly/binary.xml 
b/apm-dist/src/main/assembly/binary.xml
index 7848d357dc..0f7a29454d 100644
--- a/apm-dist/src/main/assembly/binary.xml
+++ b/apm-dist/src/main/assembly/binary.xml
@@ -62,6 +62,7 @@
                 <include>hierarchy-definition.yml</include>
                 <include>bydb.dependencies.properties</include>
                 <include>bydb.yml</include>
+                <include>bydb-topn.yml</include>
                 <include>oal/*.oal</include>
                 <include>fetcher-prom-rules/*.yaml</include>
                 <include>envoy-metrics-rules/**</include>
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 6d53e69b08..22107feab8 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -12,7 +12,7 @@
 * PromQL Service: traffic query support `limit` and regex match.
 * Fix an edge case of HashCodeSelector(Integer#MIN_VALUE causes 
ArrayIndexOutOfBoundsException).
 * Support Flink monitoring.
-* BanyanDB: Support `@ShardingKey` for Measure tags and set to TopNAggregation 
group tag by default.
+* BanyanDB: Support `@ShardingKey` for Measure tags.
 * BanyanDB: Support cold stage data query for metrics/traces/logs.
 * Increase the idle check interval of the message queue to 200ms to reduce CPU 
usage under low load conditions.
 * Limit max attempts of DNS resolution of Istio ServiceEntry to 3, and do not 
wait for first resolution result in case the DNS is not resolvable at all.
@@ -31,6 +31,7 @@
 * Fix `disable.oal` does not work.
 * Enhance the stability of e2e PHP tests and update the PHP agent version.
 * Add component ID for the `dameng` JDBC driver.
+* BanyanDB: Support custom `TopN pre-aggregation` rules configuration in file 
`bydb-topn.yml`.
 
 #### UI
 
diff --git a/docs/en/setup/backend/storages/banyandb.md 
b/docs/en/setup/backend/storages/banyandb.md
index 0dbbcee375..5624f11252 100644
--- a/docs/en/setup/backend/storages/banyandb.md
+++ b/docs/en/setup/backend/storages/banyandb.md
@@ -233,6 +233,88 @@ groups:
     shardNum: ${SW_STORAGE_BANYANDB_PROPERTY_SHARD_NUM:1}
 
 ```
+### TopN Rules Configuration
+The BanyanDB storage supports TopN pre-aggregation in the BanyanDB server 
side, which trades off more disk_volume for pre-aggregation to save CPU cost, 
and perform faster query in the query stage. 
+You can define the TopN rules for different metrics. The configuration is 
defined in the `bydb-topn.yaml` file:
+
+```yaml
+# This file is used to configure the TopN rules for BanyanDB in SkyWalking OAP 
server.
+# The rules define how to aggregate and sort `metrics (Measure)` for services, 
endpoints, and instances.
+#
+# - name: Required. The name of the TopN rule, uniquely identifies the rule.
+# - metricName: Required. The name of the metric to be aggregated.
+# - groupByTagNames: Optional, default `[]`. The tag names to group the 
metrics by. If not specified, the metrics will sort without grouped.
+# - countersNumber: Optional, default `1000`. The max size of entries in a 
time window for the pre-aggregation.
+
+# The size of LRU determines the maximally tolerated time range.
+# The buffers in the time range are kept in the memory so that
+# the data in [T - lruSize * n, T] would be accepted in the pre-aggregation 
process.
+# T = the current time in the current dimensionality.
+# n = interval in the current dimensionality.
+# - lruSizeMinute: Optional, default `10`. Defines how many time_buckets are 
held in the memory for minute-level metrics.
+# - lruSizeHourDay: Optional, default `2`. Defines how many time_buckets are 
held in the memory for hour and day-level metrics.
+
+# - sort: Optional, default `all`. The sorting order for the metrics, asc, des 
or all(include both asc and des).
+
+TopN-Rules:
+   # endpoint metrics
+   # `attr0` is defined in the `EndpointDecorator` as the Layer.
+  - name: endpoint_cpm
+    metricName: endpoint_cpm
+    sort: des
+  - name: endpoint_cpm-layer
+    metricName: endpoint_cpm
+    groupByTagNames:
+      - attr0
+    sort: des
+  - name: endpoint_cpm-service
+    metricName: endpoint_cpm
+    groupByTagNames:
+      - service_id
+    sort: des
+  - name: endpoint_sla
+    metricName: endpoint_sla
+    sort: asc
+  - name: endpoint_sla-layer
+    metricName: endpoint_sla
+    groupByTagNames:
+      - attr0
+    sort: asc
+  - name: endpoint_sla-service
+    metricName: endpoint_sla
+    groupByTagNames:
+      - service_id
+    sort: asc
+  - name: endpoint_resp_time
+    metricName: endpoint_resp_time
+    sort: des
+  - name: endpoint_resp_time-layer
+    metricName: endpoint_resp_time
+    groupByTagNames:
+      - attr0
+    sort: des
+  - name: endpoint_resp_time-service
+    metricName: endpoint_resp_time
+    groupByTagNames:
+      - service_id
+    sort: des
+  # browser_app_page_pv metrics
+  - name: browser_app_page_pv-service
+    metricName: browser_app_page_pv
+    groupByTagNames:
+      - service_id
+    sort: des
+  - name: browser_app_page_error_sum-service
+    metricName: browser_app_page_error_sum
+    groupByTagNames:
+      - service_id
+    sort: des
+  - name: browser_app_page_error_rate-service
+    metricName: browser_app_page_error_rate
+    groupByTagNames:
+      - service_id
+    sort: des
+```
 
 ### Installation Modes
 
diff --git 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java
 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java
index c76ac3f803..479c9bafd3 100644
--- 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java
+++ 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java
@@ -34,11 +34,11 @@ public class SourceColumn {
     private int length;
     private String fieldSetter;
     private String fieldGetter;
-    private final boolean groupByCondInTopN;
+    private final int shardingKeyIdx;
     private final boolean attribute;
 
     public SourceColumn(String fieldName, String columnName, Class<?> type, 
boolean isID, int length,
-                        boolean groupByCondInTopN, boolean attribute) {
+                        int shardingKeyIdx, boolean attribute) {
         this.fieldName = fieldName;
         this.columnName = columnName;
         this.type = type;
@@ -48,7 +48,7 @@ public class SourceColumn {
 
         this.fieldGetter = ClassMethodUtil.toGetMethod(fieldName);
         this.fieldSetter = ClassMethodUtil.toSetMethod(fieldName);
-        this.groupByCondInTopN = groupByCondInTopN;
+        this.shardingKeyIdx = shardingKeyIdx;
         this.attribute = attribute;
     }
 
@@ -82,9 +82,16 @@ public class SourceColumn {
         this.typeName = typeName;
     }
 
+    /**
+     * @return true if this column is a part of sharding key
+     */
+    public boolean isShardingKey() {
+        return this.shardingKeyIdx > -1;
+    }
+
     @Override
     public String toString() {
-        return "SourceColumn{" + "fieldName='" + fieldName + '\'' + ", 
columnName='" + columnName + '\'' + ", type=" + type + ", isID=" + isID + '}';
+        return "SourceColumn{" + "fieldName=" + fieldName + ", columnName=" + 
columnName + ", type=" + type + ", isID=" + isID + ", shardingKeyIdx=" + 
shardingKeyIdx + ", isAttribute=" + attribute + "}";
     }
 
     @Override
diff --git 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java
 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java
index dadcbee205..494a1b650c 100644
--- 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java
+++ 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java
@@ -31,7 +31,7 @@ public class SourceColumnsFactory {
         for (ScopeDefaultColumn defaultColumn : columns) {
             sourceColumns.add(
                 new SourceColumn(defaultColumn.getFieldName(), 
defaultColumn.getColumnName(), defaultColumn
-                    .getType(), defaultColumn.isID(), 
defaultColumn.getLength(), defaultColumn.isGroupByCondInTopN(), 
defaultColumn.isAttribute()));
+                    .getType(), defaultColumn.isID(), 
defaultColumn.getLength(), defaultColumn.getShardingKeyIdx(), 
defaultColumn.isAttribute()));
         }
         return sourceColumns;
     }
diff --git 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/util/OALClassGenerator.java
 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/util/OALClassGenerator.java
index f53dfcc5b4..da35514c0a 100644
--- 
a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/util/OALClassGenerator.java
+++ 
b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/util/OALClassGenerator.java
@@ -211,13 +211,9 @@ public class OALClassGenerator {
                     final var enableDocValuesAnnotation = new 
Annotation(ElasticSearch.EnableDocValues.class.getName(), constPool);
                     
annotationsAttribute.addAnnotation(enableDocValuesAnnotation);
                 }
-
-                if (field.isGroupByCondInTopN()) {
-                    Annotation banyanTopNAggregationAnnotation = new 
Annotation(BanyanDB.TopNAggregation.class.getName(), constPool);
-                    
annotationsAttribute.addAnnotation(banyanTopNAggregationAnnotation);
-                    // If TopN, add ShardingKey to group field.
+                if (field.isShardingKey()) {
                     Annotation banyanShardingKeyAnnotation = new 
Annotation(BanyanDB.ShardingKey.class.getName(), constPool);
-                    banyanShardingKeyAnnotation.addMemberValue("index", new 
IntegerMemberValue(constPool, 0));
+                    banyanShardingKeyAnnotation.addMemberValue("index", new 
IntegerMemberValue(constPool, field.getShardingKeyIdx()));
                     
annotationsAttribute.addAnnotation(banyanShardingKeyAnnotation);
                 }
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java
index adab7546cc..c9ded0962a 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java
@@ -41,7 +41,6 @@ public class BrowserAppPagePerf extends BrowserAppPerfSource {
 
     @Getter
     @ScopeDefaultColumn.DefinedByField(columnName = "service_id")
-    @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true)
     private String serviceId;
     @Getter
     @Setter
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPageTraffic.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPageTraffic.java
index 1f279c3c13..745d9c008d 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPageTraffic.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPageTraffic.java
@@ -41,6 +41,7 @@ public class BrowserAppPageTraffic extends 
BrowserAppTrafficSource {
 
     @Getter
     @ScopeDefaultColumn.DefinedByField(columnName = "service_id")
+    @ScopeDefaultColumn.BanyanDB(shardingKeyIdx = 0)
     private String serviceId;
     @Getter
     @Setter
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppResourcePerf.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppResourcePerf.java
index cda18af2e0..32894d24e2 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppResourcePerf.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppResourcePerf.java
@@ -45,7 +45,6 @@ public class BrowserAppResourcePerf extends Source {
     }
 
     @ScopeDefaultColumn.DefinedByField(columnName = "service_id")
-    @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true)
     private String serviceId;
     @ScopeDefaultColumn.DefinedByField(columnName = "service_name", 
requireDynamicActive = true)
     private String serviceName;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebInteractionPerf.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebInteractionPerf.java
index 50a6f84cee..2a303e33d1 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebInteractionPerf.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebInteractionPerf.java
@@ -44,7 +44,6 @@ public class BrowserAppWebInteractionPerf extends Source {
     }
 
     @ScopeDefaultColumn.DefinedByField(columnName = "service_id")
-    @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true)
     private String serviceId;
     @ScopeDefaultColumn.DefinedByField(columnName = "service_name", 
requireDynamicActive = true)
     private String serviceName;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebVitalsPerf.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebVitalsPerf.java
index 95c003596c..c7df9271ad 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebVitalsPerf.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebVitalsPerf.java
@@ -45,7 +45,6 @@ public class BrowserAppWebVitalsPerf extends Source {
     }
 
     @ScopeDefaultColumn.DefinedByField(columnName = "service_id")
-    @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true)
     private String serviceId;
     @ScopeDefaultColumn.DefinedByField(columnName = "service_name", 
requireDynamicActive = true)
     private String serviceName;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 13bb43524d..1d51cc193e 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -232,7 +232,7 @@ public class DefaultScopeDefine {
         if (virtualColumn != null) {
             scopeDefaultColumns.add(
                 new ScopeDefaultColumn(virtualColumn.fieldName(), 
virtualColumn.columnName(), virtualColumn
-                    .type(), virtualColumn.isID(), virtualColumn.length(), 
false, false));
+                    .type(), virtualColumn.isID(), virtualColumn.length(), -1, 
false));
         }
         Field[] scopeClassField = originalClass.getDeclaredFields();
         if (scopeClassField != null) {
@@ -241,16 +241,16 @@ public class DefaultScopeDefine {
                     ScopeDefaultColumn.DefinedByField.class);
                 ScopeDefaultColumn.BanyanDB banyanDB = field.getAnnotation(
                     ScopeDefaultColumn.BanyanDB.class);
-                boolean groupByCondInTopN = false;
+                int shardingKeyIdx = -1;
                 if (banyanDB != null) {
-                    groupByCondInTopN = banyanDB.groupByCondInTopN();
+                    shardingKeyIdx = banyanDB.shardingKeyIdx();
                 }
                 if (definedByField != null) {
                     if (!definedByField.requireDynamicActive() || 
ACTIVE_EXTRA_MODEL_COLUMNS) {
                         scopeDefaultColumns.add(
                             new ScopeDefaultColumn(
                                 field.getName(), definedByField.columnName(), 
field.getType(), false,
-                                definedByField.length(), groupByCondInTopN, 
definedByField.isAttribute()
+                                definedByField.length(), shardingKeyIdx, 
definedByField.isAttribute()
                             ));
                     }
                 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java
index acc5955f58..d1135a7640 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java
@@ -57,7 +57,7 @@ public class Endpoint extends Source {
     private String name;
     @Getter
     @ScopeDefaultColumn.DefinedByField(columnName = "service_id")
-    @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true)
+    @ScopeDefaultColumn.BanyanDB(shardingKeyIdx = 0)
     private String serviceId;
     @Getter
     @Setter
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/K8SEndpoint.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/K8SEndpoint.java
index 0f16aaa02d..f97f83e72b 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/K8SEndpoint.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/K8SEndpoint.java
@@ -32,7 +32,7 @@ public class K8SEndpoint extends K8SMetrics.ProtocolMetrics {
     private volatile String entityId;
 
     @ScopeDefaultColumn.DefinedByField(columnName = "service_id")
-    @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true)
+    @ScopeDefaultColumn.BanyanDB(shardingKeyIdx = 0)
     private String serviceId;
     @ScopeDefaultColumn.DefinedByField(columnName = "service_name", 
requireDynamicActive = true)
     private String serviceName;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java
index 7688668087..f96f056107 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java
@@ -24,6 +24,7 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 
 /**
  * Define the default columns of source scope. These columns pass down into 
the persistent entity(OAL metrics entity)
@@ -36,16 +37,16 @@ public class ScopeDefaultColumn {
     private Class<?> type;
     private boolean isID;
     private int length;
-    private final boolean groupByCondInTopN;
+    private final int shardingKeyIdx;
     private final boolean attribute;
 
-    public ScopeDefaultColumn(String fieldName, String columnName, Class<?> 
type, boolean isID, int length, boolean groupByCondInTopN, boolean attribute) {
+    public ScopeDefaultColumn(String fieldName, String columnName, Class<?> 
type, boolean isID, int length, int shardingKeyIdx, boolean attribute) {
         this.fieldName = fieldName;
         this.columnName = columnName;
         this.type = type;
         this.isID = isID;
         this.length = length;
-        this.groupByCondInTopN = groupByCondInTopN;
+        this.shardingKeyIdx = shardingKeyIdx;
         this.attribute = attribute;
     }
 
@@ -80,12 +81,17 @@ public class ScopeDefaultColumn {
     @Retention(RetentionPolicy.RUNTIME)
     public @interface BanyanDB {
         /**
-         * Indicate whether this column is a condition for groupBy in the TopN 
Aggregation.
+         * ShardingKey is used to group time series data per metric in one 
place. Optional. Only support Measure Tag.
+         * If ShardingKey is not set, the default ShardingKey is based on the 
combination of 'name' and 'entity' according to the {@link 
org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB.SeriesID}.
+         * <p>
+         * The typical scenario to specify the ShardingKey to the Group tag 
when the metric generate a TopNAggregation:
+         * If not set, the default data distribution based on the combination 
of 'name' and 'entity', can lead to performance issues when calculating the 
'TopNAggregation'.
+         * This is because each shard only has a subset of the top-n list, and 
the query process has to be responsible for aggregating those lists to obtain 
the final result.
+         * This introduces overhead in terms of querying performance and disk 
usage.
          *
-         * @since 9.5.0
-         * @since 10.2.0 moved out from {@link DefinedByField} to {@link 
BanyanDB}
+         * @since 10.3.0
          */
-        boolean groupByCondInTopN() default false;
+        int shardingKeyIdx() default -1;
     }
 
     @Target({ElementType.TYPE})
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
index 41d702165c..8c6afc4771 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.oap.server.core.storage.annotation;
 
 import java.lang.annotation.ElementType;
-import java.lang.annotation.Inherited;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
@@ -179,31 +178,6 @@ public @interface BanyanDB {
     @interface StoreIDAsTag {
     }
 
-    /**
-     * Generate a TopN Aggregation and use the annotated column as a groupBy 
tag.
-     * It also contains parameters for TopNAggregation
-     *
-     * @since 9.4.0
-     */
-    @Target({ElementType.FIELD})
-    @Retention(RetentionPolicy.RUNTIME)
-    @Inherited
-    @interface TopNAggregation {
-        /**
-         * The size of LRU determines the maximally tolerated time range.
-         * The buffers in the time range are kept in the memory so that
-         * the data in [T - lruSize * n, T] would be accepted in the 
pre-aggregation process.
-         * T = the current time in the current dimensionality.
-         * n = interval in the current dimensionality.
-         */
-        int lruSize() default 2;
-
-        /**
-         * The max size of entries in a time window for the pre-aggregation.
-         */
-        int countersNumber() default 1000;
-    }
-
     /**
      * Match query is designed for BanyanDB match query with specific 
analyzer. It is a fuzzy query implementation
      * powered by analyzer.
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
index 307359755c..7adecdff90 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java
@@ -23,8 +23,6 @@ import lombok.Setter;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 
-import java.util.List;
-
 /**
  * BanyanDBExtension represents extra metadata for models, but specific for 
BanyanDB usages.
  *
@@ -49,10 +47,6 @@ public class BanyanDBModelExtension {
     @Setter
     private boolean storeIDTag;
 
-    @Getter
-    @Setter
-    private TopN topN;
-
     /**
      * indexMode indicates whether a metric is in the index mode.
      *
@@ -62,34 +56,7 @@ public class BanyanDBModelExtension {
     @Setter
     private boolean indexMode;
 
-    public static class TopN {
-        /**
-         * lru_size defines how many time_buckets are held in the memory.
-         * For example, "2" means data points belonging to the latest "2" 
time_buckets will be persisted.
-         * The default value is 2 in the BanyanDB if not set.
-         *
-         * @since 9.4.0
-         */
-        @Getter
-        @Setter
-        private int lruSize;
-
-        /**
-         * counters_number defines the max number of entries to be tracked 
during the pre-aggregation.
-         * The default value is 1000 in the BanyanDB if not set.
-         *
-         * @since 9.4.0
-         */
-        @Getter
-        @Setter
-        private int countersNumber;
-
-        @Setter
-        @Getter
-        private List<String> groupByTagNames;
-    }
-
-        @Setter
-        @Getter
-        private BanyanDB.StreamGroup streamGroup = 
BanyanDB.StreamGroup.RECORDS;
+    @Setter
+    @Getter
+    private BanyanDB.StreamGroup streamGroup = BanyanDB.StreamGroup.RECORDS;
 }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
index 059f989367..df85a92ed7 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java
@@ -231,8 +231,6 @@ public class StorageModels implements IModelManager, 
ModelCreator, ModelManipula
                     BanyanDB.IndexRule.class);
                 final BanyanDB.MeasureField banyanDBMeasureField = 
field.getAnnotation(
                     BanyanDB.MeasureField.class);
-                final BanyanDB.TopNAggregation topNAggregation = 
field.getAnnotation(
-                    BanyanDB.TopNAggregation.class);
                 final BanyanDB.MatchQuery analyzer = field.getAnnotation(
                     BanyanDB.MatchQuery.class);
                 final BanyanDB.EnableSort enableSort = field.getAnnotation(
@@ -248,14 +246,6 @@ public class StorageModels implements IModelManager, 
ModelCreator, ModelManipula
                     enableSort != null
                 );
 
-                if (topNAggregation != null) {
-                    BanyanDBModelExtension.TopN topN = new 
BanyanDBModelExtension.TopN();
-                    topN.setLruSize(topNAggregation.lruSize());
-                    topN.setCountersNumber(topNAggregation.countersNumber());
-                    
topN.setGroupByTagNames(Collections.singletonList(column.name()));
-                    banyanDBModelExtension.setTopN(topN);
-                }
-
                 final ModelColumn modelColumn = new ModelColumn(
                     new ColumnName(column),
                     field.getType(),
diff --git a/oap-server/server-starter/src/main/resources/bydb-topn.yml 
b/oap-server/server-starter/src/main/resources/bydb-topn.yml
new file mode 100644
index 0000000000..c20a87f41f
--- /dev/null
+++ b/oap-server/server-starter/src/main/resources/bydb-topn.yml
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This file is used to configure the TopN rules for BanyanDB in SkyWalking OAP 
server.
+# The rules define how to aggregate and sort `metrics (Measure)` for services, 
endpoints, and instances.
+#
+# - name: Required. The name of the TopN rule, uniquely identifies the rule.
+# - metricName: Required. The name of the metric to be aggregated.
+# - groupByTagNames: Optional, default `[]`. The tag names to group the 
metrics by. If not specified, the metrics will sort without grouped.
+# - countersNumber: Optional, default `1000`. The max size of entries in a 
time window for the pre-aggregation.
+
+# The size of LRU determines the maximally tolerated time range.
+# The buffers in the time range are kept in the memory so that
+# the data in [T - lruSize * n, T] would be accepted in the pre-aggregation 
process.
+# T = the current time in the current dimensionality.
+# n = interval in the current dimensionality.
+# - lruSizeMinute: Optional, default `10`. Defines how many time_buckets are 
held in the memory for minute-level metrics.
+# - lruSizeHourDay: Optional, default `2`. Defines how many time_buckets are 
held in the memory for hour and day-level metrics.
+
+# - sort: Optional, default `all`. The sorting order for the metrics, asc, des 
or all(include both asc and des).
+
+TopN-Rules:
+   # endpoint metrics
+   # `attr0` is defined in the `EndpointDecorator` as the Layer.
+  - name: endpoint_cpm-layer
+    metricName: endpoint_cpm
+    groupByTagNames:
+      - attr0
+    sort: des
+  - name: endpoint_cpm-service
+    metricName: endpoint_cpm
+    groupByTagNames:
+      - service_id
+    sort: des
+  - name: endpoint_sla-layer
+    metricName: endpoint_sla
+    groupByTagNames:
+      - attr0
+    sort: asc
+  - name: endpoint_sla-service
+    metricName: endpoint_sla
+    groupByTagNames:
+      - service_id
+    sort: asc
+  - name: endpoint_resp_time-layer
+    metricName: endpoint_resp_time
+    groupByTagNames:
+      - attr0
+    sort: des
+  - name: endpoint_resp_time-service
+    metricName: endpoint_resp_time
+    groupByTagNames:
+      - service_id
+    sort: des
+  # browser_app_page_pv metrics
+  - name: browser_app_page_pv-service
+    metricName: browser_app_page_pv
+    groupByTagNames:
+      - service_id
+    sort: des
+  - name: browser_app_page_error_sum-service
+    metricName: browser_app_page_error_sum
+    groupByTagNames:
+      - service_id
+    sort: des
+  - name: browser_app_page_error_rate-service
+    metricName: browser_app_page_error_rate
+    groupByTagNames:
+      - service_id
+    sort: des
\ No newline at end of file
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
index 0fd25a9fa8..3781e5c667 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java
@@ -19,6 +19,8 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
 import org.apache.skywalking.banyandb.v1.client.DataPoint;
 import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
@@ -27,6 +29,7 @@ import 
org.apache.skywalking.banyandb.v1.client.TimestampRange;
 import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse;
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.query.enumeration.Order;
+import org.apache.skywalking.oap.server.core.query.input.AttrCondition;
 import org.apache.skywalking.oap.server.core.query.input.Duration;
 import org.apache.skywalking.oap.server.core.query.input.TopNCondition;
 import org.apache.skywalking.oap.server.core.query.type.KeyValue;
@@ -41,7 +44,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO 
implements IAggregationQueryDAO {
     private static final Set<String> TAGS = ImmutableSet.of(Metrics.ENTITY_ID);
@@ -65,13 +67,52 @@ public class BanyanDBAggregationQueryDAO extends 
AbstractBanyanDBDAO implements
         }
 
         // BanyanDB server-side TopN support for metrics pre-aggregation.
-        if (schema.getTopNSpec() != null && 
CollectionUtils.isEmpty(condition.getAttributes())) {
-            // 1) no additional conditions
-            // 2) additional conditions are all group by tags
-            if (CollectionUtils.isEmpty(additionalConditions) ||
-                    
additionalConditions.stream().map(KeyValue::getKey).collect(Collectors.toSet())
-                            
.equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNamesList()))) {
-                return serverSideTopN(isColdStage, condition, schema, spec, 
getTimestampRange(duration), additionalConditions);
+        // The query tags are the additional conditions and attributes defined 
in the TopN condition.
+        // The query tags is the key to find the TopN aggregation in the 
schema.
+        // If the TopN aggregation is defined in the schema, it will be used 
to perform the query.
+        // The server-side TopN only support when attribute condition 
`isEquals == true`.
+        ImmutableSet.Builder<String> queryTags = ImmutableSet.builder();
+        boolean equalsQuery = true;
+        if (condition.getAttributes() != null) {
+            for (AttrCondition attr : condition.getAttributes()) {
+                if (!attr.isEquals()) {
+                    equalsQuery = false;
+                    break;
+                }
+                queryTags.add(attr.getKey());
+            }
+        }
+        if (!equalsQuery) {
+            return directMetricsTopN(isColdStage, condition, schema, 
valueColumnName, spec, getTimestampRange(duration), additionalConditions);
+        }
+        if (additionalConditions != null) {
+            additionalConditions.forEach(additionalCondition -> 
queryTags.add(additionalCondition.getKey()));
+        }
+        if (schema.getTopNSpecs() != null) {
+            BanyandbDatabase.TopNAggregation topNAggregation = 
schema.getTopNSpecs().get(queryTags.build());
+            if (topNAggregation != null) {
+                BanyandbModel.Sort sort = topNAggregation.getFieldValueSort();
+                // If the TopN aggregation is defined in the schema, use it.
+                switch (condition.getOrder()) {
+                    case DES:
+                        if (sort == BanyandbModel.Sort.SORT_DESC || sort == 
BanyandbModel.Sort.SORT_UNSPECIFIED) {
+                            return serverSideTopN(
+                                isColdStage, condition, schema, spec, 
getTimestampRange(duration), additionalConditions,
+                                topNAggregation.getMetadata().getName(), 
AbstractQuery.Sort.DESC
+                            );
+                        }
+                        break;
+                    case ASC:
+                        if (sort == BanyandbModel.Sort.SORT_ASC || sort == 
BanyandbModel.Sort.SORT_UNSPECIFIED) {
+                            return serverSideTopN(
+                                isColdStage, condition, schema, spec, 
getTimestampRange(duration), additionalConditions,
+                                topNAggregation.getMetadata().getName(), 
AbstractQuery.Sort.ASC
+                            );
+                        }
+                        break;
+                    default:
+                        throw new IOException("Unsupported order: " + 
condition.getOrder());
+                }
             }
         }
 
@@ -79,13 +120,9 @@ public class BanyanDBAggregationQueryDAO extends 
AbstractBanyanDBDAO implements
     }
 
     List<SelectedRecord> serverSideTopN(boolean isColdStage, TopNCondition 
condition, MetadataRegistry.Schema schema, MetadataRegistry.ColumnSpec 
valueColumnSpec,
-                                        TimestampRange timestampRange, 
List<KeyValue> additionalConditions) throws IOException {
-        TopNQueryResponse resp = null;
-        if (condition.getOrder() == Order.DES) {
-            resp = topNQueryDebuggable(isColdStage, schema, timestampRange, 
condition.getTopN(), AbstractQuery.Sort.DESC, additionalConditions, 
condition.getAttributes());
-        } else {
-            resp = topNQueryDebuggable(isColdStage, schema, timestampRange, 
condition.getTopN(), AbstractQuery.Sort.ASC, additionalConditions, 
condition.getAttributes());
-        }
+                                        TimestampRange timestampRange, 
List<KeyValue> additionalConditions, String topNRuleName, AbstractQuery.Sort 
sort) throws IOException {
+        TopNQueryResponse resp;
+        resp = topNQueryDebuggable(isColdStage, schema, timestampRange, 
condition.getTopN(), sort, additionalConditions, condition.getAttributes(), 
topNRuleName);
         if (resp.size() == 0) {
             return Collections.emptyList();
         } else if (resp.size() > 1) { // since we have done aggregation, i.e. 
MEAN
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConfigLoader.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConfigLoader.java
index ea4f931003..0104aa1177 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConfigLoader.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConfigLoader.java
@@ -21,6 +21,7 @@ package 
org.apache.skywalking.oap.server.storage.plugin.banyandb;
 import java.io.FileNotFoundException;
 import java.io.Reader;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import lombok.extern.slf4j.Slf4j;
@@ -28,6 +29,7 @@ import 
org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
 import org.apache.skywalking.oap.server.library.module.ModuleProvider;
 import org.apache.skywalking.oap.server.library.module.ModuleStartException;
 import org.apache.skywalking.oap.server.library.util.ResourceUtils;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageConfig.TopN;
 import org.yaml.snakeyaml.Yaml;
 
 import static 
org.apache.skywalking.oap.server.library.util.YamlConfigLoaderUtils.copyProperties;
@@ -36,16 +38,25 @@ import static 
org.apache.skywalking.oap.server.library.util.YamlConfigLoaderUtil
 @Slf4j
 public class BanyanDBConfigLoader {
     private final ModuleProvider moduleProvider;
+    private BanyanDBStorageConfig config;
     private final Yaml yaml;
+    private final Yaml topNYaml;
 
     public BanyanDBConfigLoader(final ModuleProvider moduleProvider) {
         this.moduleProvider = moduleProvider;
+        this.config = new BanyanDBStorageConfig();
         this.yaml = new Yaml();
+        this.topNYaml = new Yaml();
     }
 
     public BanyanDBStorageConfig loadConfig() throws ModuleStartException {
-        BanyanDBStorageConfig config = new BanyanDBStorageConfig();
-        Reader applicationReader = null;
+         loadBaseConfig();
+         loadTopNConfig();
+         return config;
+    }
+
+    private void loadBaseConfig() throws ModuleStartException {
+        Reader applicationReader;
         try {
             applicationReader = ResourceUtils.read("bydb.yml");
         } catch (FileNotFoundException e) {
@@ -53,7 +64,7 @@ public class BanyanDBConfigLoader {
         }
         Map<String, Map<String, ?>> configMap = yaml.loadAs(applicationReader, 
Map.class);
         if (configMap == null) {
-            return config;
+            return;
         }
 
         Map<String, Properties> configProperties = new HashMap<>();
@@ -137,7 +148,6 @@ public class BanyanDBConfigLoader {
         } catch (IllegalAccessException e) {
             throw new ModuleStartException("Failed to load BanyanDB 
configuration.", e);
         }
-        return config;
     }
 
     private Properties parseConfig(final Map<String, ?> config) {
@@ -181,4 +191,54 @@ public class BanyanDBConfigLoader {
             groupResource.getAdditionalLifecycleStages().add(cold);
         }
     }
+
+    private void loadTopNConfig() throws ModuleStartException {
+        Reader applicationReader;
+        try {
+            applicationReader = ResourceUtils.read("bydb-topn.yml");
+        } catch (FileNotFoundException e) {
+            throw new ModuleStartException("Cannot find the BanyanDB topN 
configuration file [bydb-topn.yml].", e);
+        }
+        Map<String, List<Map<String, ?>>> configMap = 
yaml.loadAs(applicationReader, Map.class);
+        if (configMap == null) {
+            return;
+        }
+        List<Map<String, ?>> topNConfig = configMap.get("TopN-Rules");
+        for (Map<String, ?> rule : topNConfig) {
+            TopN topN = new TopN();
+            var name = rule.get("name");
+            if (name == null) {
+                throw new ModuleStartException("TopN rule name is missing in 
file [bydb-topn.yml].");
+            }
+            var metricName = rule.get("metricName");
+            if (metricName == null) {
+                throw new ModuleStartException("TopN rule metricName is 
missing in file [bydb-topn.yml].");
+            }
+            topN.setName(name.toString());
+            var groupByTagNames = rule.get("groupByTagNames");
+            if (groupByTagNames != null) {
+                topN.setGroupByTagNames((List<String>) groupByTagNames);
+            }
+            var countersNumber = rule.get("countersNumber");
+            if (countersNumber != null) {
+                topN.setLruSizeMinute((int) countersNumber);
+            }
+            var lruSizeMinute = rule.get("lruSizeMinute");
+            if (lruSizeMinute != null) {
+                topN.setLruSizeMinute((int) lruSizeMinute);
+            }
+            var lruSizeHourDay = rule.get("lruSizeHourDay");
+            if (lruSizeHourDay != null) {
+                topN.setLruSizeMinute((int) lruSizeHourDay);
+            }
+            var sort = rule.get("sort");
+            if (sort != null) {
+                topN.setSort(TopN.Sort.valueOf(sort.toString()));
+            }
+            Map<String, TopN> map = 
config.getTopNConfigs().computeIfAbsent(metricName.toString(), k -> new 
HashMap<>());
+            if (map.put(name.toString(), topN) != null) {
+                throw new ModuleStartException("Duplicate TopN rule name: " + 
name + " in file [bydb-topn.yml].");
+            }
+        }
+    }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index 90bebd4072..ba6ccf0450 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -195,18 +195,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller 
{
                             }
                         }
                         final MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model);
-                        try {
-                            defineTopNAggregation(schema, client);
-                        } catch (BanyanDBException ex) {
-                            if 
(ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
-                                log.info("Measure schema {}_{} TopN({}) 
already created by another OAP node",
-                                    model.getName(),
-                                    model.getDownsampling(),
-                                    schema.getTopNSpec());
-                            } else {
-                                throw ex;
-                            }
-                        }
+                        defineTopNAggregation(schema, client);
                     }
                 }
             } else {
@@ -376,20 +365,22 @@ public class BanyanDBIndexInstaller extends 
ModelInstaller {
     }
 
     private void defineTopNAggregation(MetadataRegistry.Schema schema, 
BanyanDBClient client) throws BanyanDBException {
-        if (schema.getTopNSpec() == null) {
+        if (CollectionUtils.isEmpty(schema.getTopNSpecs())) {
             if (schema.getMetadata().getKind() == 
MetadataRegistry.Kind.MEASURE) {
                 log.debug("skip null TopN Schema for [{}]", 
schema.getMetadata().name());
             }
             return;
         }
-        try {
-            client.define(schema.getTopNSpec());
-            log.info("installed TopN schema for measure {}", 
schema.getMetadata().name());
-        } catch (BanyanDBException ex) {
-            if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
-                log.info("TopNAggregation {} already created by another OAP 
node", schema.getTopNSpec());
-            } else {
-                throw ex;
+        for (TopNAggregation topNSpec : schema.getTopNSpecs().values()) {
+            try {
+                client.define(topNSpec);
+                log.info("installed TopN schema for measure {}", 
schema.getMetadata().name());
+            } catch (BanyanDBException ex) {
+                if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
+                    log.info("TopNAggregation {} already created by another 
OAP node", topNSpec);
+                } else {
+                    throw ex;
+                }
             }
         }
     }
@@ -628,15 +619,16 @@ public class BanyanDBIndexInstaller extends 
ModelInstaller {
 
     /**
      * Check if the TopN aggregation exists and update it if necessary.
-     * If the old TopN aggregation is not in the schema, delete it.
+     * todo:// can not delete TopN here now.
      */
     private void checkTopNAggregation(Model model, BanyanDBClient client) 
throws BanyanDBException {
         MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model);
-        String topNName = 
MetadataRegistry.Schema.formatTopNName(schema.getMetadata().name());
-        TopNAggregation hisTopNAggregation = 
client.findTopNAggregation(schema.getMetadata().getGroup(), topNName);
-
-        if (schema.getTopNSpec() != null) {
-            TopNAggregation topNAggregation = schema.getTopNSpec();
+        if (schema.getTopNSpecs() == null) {
+            return;
+        }
+        for (TopNAggregation topNAggregation : schema.getTopNSpecs().values()) 
{
+            String topNName = topNAggregation.getMetadata().getName();
+            TopNAggregation hisTopNAggregation = 
client.findTopNAggregation(schema.getMetadata().getGroup(), topNName);
             if (hisTopNAggregation == null) {
                 try {
                     client.define(topNAggregation);
@@ -662,14 +654,6 @@ public class BanyanDBIndexInstaller extends ModelInstaller 
{
                     );
                 }
             }
-        } else {
-            if (hisTopNAggregation != null) {
-                client.deleteTopNAggregation(schema.getMetadata().getGroup(), 
topNName);
-                log.info(
-                    "delete deprecated TopNAggregation: {} from group: {}", 
hisTopNAggregation.getMetadata().getName(),
-                    schema.getMetadata().getGroup()
-                );
-            }
         }
     }
 
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java
index ea7944371b..e437841883 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java
@@ -21,9 +21,13 @@ package 
org.apache.skywalking.oap.server.storage.plugin.banyandb;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 import org.apache.skywalking.oap.server.library.module.ModuleConfig;
 
 @Getter
@@ -42,6 +46,8 @@ public class BanyanDBStorageConfig extends ModuleConfig {
     private Metadata metadata = new Metadata();
     private Property property = new Property();
 
+    private Map<String/*metric name*/, Map<String, TopN>> topNConfigs = new 
HashMap<>();
+
     public String[] getTargetArray() {
         return Iterables.toArray(
             
Splitter.on(",").omitEmptyStrings().trimResults().split(this.global.targets), 
String.class);
@@ -215,4 +221,54 @@ public class BanyanDBStorageConfig extends ModuleConfig {
     @Setter
     public static class Property extends BanyanDBStorageConfig.GroupResource {
     }
+
+    @Data
+    public static class TopN {
+        private String name;
+        /**
+         * The size of LRU determines the maximally tolerated time range.
+         * The buffers in the time range are kept in the memory so that
+         * the data in [T - lruSize * n, T] would be accepted in the 
pre-aggregation process.
+         * T = the current time in the current dimensionality.
+         * n = interval in the current dimensionality.
+         * lruSizeMinute defines how many time_buckets are held in the memory 
for minute-level metrics.
+         * For example, "10" means data points belonging to the latest "10" 
time_buckets will be persisted.
+         */
+        private int lruSizeMinute = 10;
+        /**
+         * lruSizeHourDay defines how many time_buckets are held in the memory 
for hour and day for minute-level metrics.
+         * For example, "2" means data points belonging to the latest "2" 
time_buckets will be persisted.
+         */
+        private int lruSizeHourDay = 2;
+
+        /**
+         * counters_number defines max size of entries in a time window for 
the pre-aggregation.
+         */
+        private int countersNumber = 1000;
+
+        /**
+         * groupByTagNames defines the tags to be used for grouping the TopN 
results.
+         * If not set, the default is empty, meaning no grouping.
+         */
+        private List<String> groupByTagNames;
+
+        /**
+         * sort defines the sorting order of the TopN results.
+         * Default is "all", which means include `des and asc`.
+         */
+        private Sort sort = Sort.all;
+
+        public enum Sort {
+            all(BanyandbModel.Sort.SORT_UNSPECIFIED),
+            des(BanyandbModel.Sort.SORT_DESC),
+            asc(BanyandbModel.Sort.SORT_ASC);
+
+            @Getter
+            private final BanyandbModel.Sort banyandbSort;
+
+            Sort(final BanyandbModel.Sort sort) {
+                this.banyandbSort = sort;
+            }
+        }
+    }
 }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 0d922cfc99..513dc3106e 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.gson.JsonObject;
 import lombok.Builder;
 import lombok.Data;
@@ -42,7 +43,6 @@ import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagFamilySpec
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagSpec;
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
 import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
 import org.apache.skywalking.oap.server.core.analysis.DownSampling;
 import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
@@ -142,11 +142,12 @@ public enum MetadataRegistry {
         MeasureMetadata tagsAndFields = parseTagAndFieldMetadata(model, 
schemaBuilder, seriesIDColumns, schemaMetadata.group);
         List<TagFamilySpec> tagFamilySpecs = 
schemaMetadata.extractTagFamilySpec(tagsAndFields.tags, 
model.getBanyanDBModelExtension().isStoreIDTag());
         // iterate over tagFamilySpecs to save tag names
-        for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) {
-            for (final TagSpec tagSpec : tagFamilySpec.getTagsList()) {
-                schemaBuilder.tag(tagSpec.getName());
-            }
-        }
+        Set<String> tags = tagFamilySpecs.stream()
+                .flatMap(tagFamilySpec -> tagFamilySpec.getTagsList().stream())
+
+                .map(TagSpec::getName)
+                .collect(Collectors.toSet());
+        schemaBuilder.tags(tags);
         List<IndexRule> indexRules = tagsAndFields.tags.stream()
                 .map(TagMetadata::getIndexRule)
                 .filter(Objects::nonNull)
@@ -177,8 +178,11 @@ public enum MetadataRegistry {
             schemaBuilder.field(field.getName());
         }
         // parse TopN
-        schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.group, 
schemaMetadata.name()));
-
+        schemaBuilder.topNSpecs(parseTopNSpecs(
+            model, schemaMetadata.group, schemaMetadata.name(),
+            config.getTopNConfigs().get(model.getName()),
+            tags
+        ));
         registry.put(schemaMetadata.name(), schemaBuilder.build());
         return new MeasureModel(builder.build(), indexRules);
     }
@@ -196,9 +200,12 @@ public enum MetadataRegistry {
         return new PropertyModel(builder.build());
     }
 
-    private TopNAggregation parseTopNSpec(final Model model, final String 
group, final String measureName)
-            throws StorageException {
-        if (model.getBanyanDBModelExtension().getTopN() == null) {
+    private Map<ImmutableSet<String>, TopNAggregation> parseTopNSpecs(final 
Model model,
+                                                                      final 
String group,
+                                                                      final 
String measureName,
+                                                                      final 
Map<String, BanyanDBStorageConfig.TopN> topNConfig,
+                                                                      final 
Set<String> tags) throws StorageException {
+        if (topNConfig == null) {
             return null;
         }
 
@@ -207,20 +214,45 @@ public enum MetadataRegistry {
             // skip non-single valued metrics
             return null;
         }
+        Map<ImmutableSet<String>, TopNAggregation> topNAggregations = new 
HashMap<>();
+        topNConfig.forEach((name, topN) -> {
+            ImmutableSet<String> key = ImmutableSet.of();
+            TopNAggregation.Builder topNAggregation = 
TopNAggregation.newBuilder()
+                                                                     
.setMetadata(
+                                                                         
Metadata.newBuilder().setGroup(group).setName(name))
+                                                                     
.setSourceMeasure(Metadata.newBuilder().setGroup(group).setName(measureName))
+                                                                     
.setFieldValueSort(topN.getSort().getBanyandbSort())
+                                                                     
.setFieldName(valueColumnOpt.get().getValueCName())
+                                                                     
.setCountersNumber(topN.getCountersNumber());
+            if (topN.getGroupByTagNames() != null) {
+                key = ImmutableSet.copyOf(topN.getGroupByTagNames());
+                //check tags
+                topN.getGroupByTagNames().forEach(tag -> {
+                    if (!tags.contains(tag)) {
+                        throw new IllegalArgumentException(
+                            "In file [bydb-topn.yml], TopN rule " + 
topN.getName() + "'s groupByTagName [" + tag + "] is not defined in metric " + 
model.getName());
+                    }
+                });
+                
topNAggregation.addAllGroupByTagNames(topN.getGroupByTagNames());
+            }
 
-        if 
(CollectionUtils.isEmpty(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames()))
 {
-            throw new StorageException("invalid groupBy tags: " + 
model.getBanyanDBModelExtension().getTopN().getGroupByTagNames());
-        }
-        return TopNAggregation.newBuilder()
-                              .setMetadata(
-                                  
Metadata.newBuilder().setGroup(group).setName(Schema.formatTopNName(measureName)))
-                              
.setSourceMeasure(Metadata.newBuilder().setGroup(group).setName(measureName))
-                              
.setFieldValueSort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN 
and BottomN
-                              
.setFieldName(valueColumnOpt.get().getValueCName())
-                              
.addAllGroupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
-                              
.setCountersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
-                              
.setLruSize(model.getBanyanDBModelExtension().getTopN().getLruSize())
-                              .build();
+            switch (model.getDownsampling()) {
+                case Minute:
+                    topNAggregation.setLruSize(topN.getLruSizeMinute());
+                    break;
+                case Hour:
+                case Day:
+                    topNAggregation.setLruSize(topN.getLruSizeHourDay());
+                    break;
+                default:
+                    throw new UnsupportedOperationException("unsupported 
downsampling: " + model.getDownsampling());
+            }
+            if (topNAggregations.containsKey(key)) {
+                throw new IllegalArgumentException("In file [bydb-topn.yml], 
TopN rule " + topN.getName() + "'s groupByTagNames " + key + " already exist in 
the same metric " + model.getName());
+            }
+            topNAggregations.put(key, topNAggregation.build());
+        });
+        return topNAggregations;
     }
 
     public Schema findMetadata(final Model model) {
@@ -690,15 +722,11 @@ public enum MetadataRegistry {
 
         @Getter
         @Nullable
-        private final TopNAggregation topNSpec;
+        private final Map<ImmutableSet<String>/*groupBy tags*/, 
TopNAggregation> topNSpecs;
 
         public ColumnSpec getSpec(String columnName) {
             return this.specs.get(columnName);
         }
-
-        public static String formatTopNName(String measureName) {
-            return measureName + "_topn";
-        }
     }
 
     @RequiredArgsConstructor
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
index 2cdb5b158a..aed0ca700b 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
 
 import com.google.gson.Gson;
-import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 import org.apache.skywalking.banyandb.v1.client.AbstractCriteria;
@@ -145,7 +144,8 @@ public abstract class AbstractBanyanDBDAO extends 
AbstractDAO<BanyanDBStorageCli
                                                     int number,
                                                     AbstractQuery.Sort sort,
                                                     List<KeyValue> 
additionalConditions,
-                                                    List<AttrCondition> 
attributes) throws IOException {
+                                                    List<AttrCondition> 
attributes,
+                                                    String topNRuleName) 
throws IOException {
         DebuggingTraceContext traceContext = 
DebuggingTraceContext.TRACE_CONTEXT.get();
         DebuggingSpan span = null;
         try {
@@ -155,6 +155,8 @@ public abstract class AbstractBanyanDBDAO extends 
AbstractDAO<BanyanDBStorageCli
                 builder.append("Condition: ")
                        .append("Schema: ")
                        .append(schema)
+                       .append(", TopNRuleName: ")
+                       .append(topNRuleName)
                        .append(", TimestampRange: ")
                        .append(timestampRange)
                        .append(", Number: ")
@@ -169,7 +171,7 @@ public abstract class AbstractBanyanDBDAO extends 
AbstractDAO<BanyanDBStorageCli
                        .append(isColdStage);
                 span.setMsg(builder.toString());
             }
-            TopNQueryResponse response = topNQuery(isColdStage, schema, 
timestampRange, number, sort, additionalConditions, attributes);
+            TopNQueryResponse response = topNQuery(isColdStage, schema, 
timestampRange, number, sort, additionalConditions, attributes, topNRuleName);
             if (traceContext != null && traceContext.isDumpStorageRsp()) {
                 builder.append("\n").append(" Response: ").append(new 
Gson().toJson(response.getTopNLists()));
                 span.setMsg(builder.toString());
@@ -188,9 +190,10 @@ public abstract class AbstractBanyanDBDAO extends 
AbstractDAO<BanyanDBStorageCli
                                         int number,
                                         AbstractQuery.Sort sort,
                                         List<KeyValue> additionalConditions,
-                                        List<AttrCondition> attributes) throws 
IOException {
-        final TopNQuery q = new 
TopNQuery(List.of(schema.getMetadata().getGroup()), Objects.requireNonNull(
-            schema.getTopNSpec()).getMetadata().getName(),
+                                        List<AttrCondition> attributes,
+                                        String topNRuleName) throws 
IOException {
+        final TopNQuery q = new 
TopNQuery(List.of(schema.getMetadata().getGroup()),
+                                          topNRuleName,
                                           timestampRange,
                                           number, sort);
         q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
index fe78339cbe..d47e265105 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java
@@ -280,7 +280,7 @@ public class BanyanDBIT {
     private static class TestMetric {
         @Column(name = "service_id")
         @BanyanDB.SeriesID(index = 0)
-        @BanyanDB.TopNAggregation
+        @BanyanDB.ShardingKey(index = 0)
         private String serviceId;
         @Column(name = "tag")
         @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)

Reply via email to