This is an automated email from the ASF dual-hosted git repository. wankai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push: new 47ce2720b9 BanyanDB: Support custom `TopN pre-aggregation` rules configuration in file `bydb-topn.yml`. (#13319) 47ce2720b9 is described below commit 47ce2720b9be6af391138c2b84c4ec63c454a3b3 Author: Wan Kai <wankai...@foxmail.com> AuthorDate: Tue Jun 17 12:49:11 2025 +0800 BanyanDB: Support custom `TopN pre-aggregation` rules configuration in file `bydb-topn.yml`. (#13319) --- apm-dist/src/main/assembly/binary.xml | 1 + docs/en/changes/changes.md | 3 +- docs/en/setup/backend/storages/banyandb.md | 82 +++++++++++++++++++++ .../skywalking/oal/rt/parser/SourceColumn.java | 15 +++- .../oal/rt/parser/SourceColumnsFactory.java | 2 +- .../skywalking/oal/rt/util/OALClassGenerator.java | 8 +- .../core/browser/source/BrowserAppPagePerf.java | 1 - .../core/browser/source/BrowserAppPageTraffic.java | 1 + .../browser/source/BrowserAppResourcePerf.java | 1 - .../source/BrowserAppWebInteractionPerf.java | 1 - .../browser/source/BrowserAppWebVitalsPerf.java | 1 - .../oap/server/core/source/DefaultScopeDefine.java | 8 +- .../oap/server/core/source/Endpoint.java | 2 +- .../oap/server/core/source/K8SEndpoint.java | 2 +- .../oap/server/core/source/ScopeDefaultColumn.java | 20 +++-- .../server/core/storage/annotation/BanyanDB.java | 26 ------- .../core/storage/model/BanyanDBModelExtension.java | 39 +--------- .../server/core/storage/model/StorageModels.java | 10 --- .../src/main/resources/bydb-topn.yml | 82 +++++++++++++++++++++ .../banyandb/BanyanDBAggregationQueryDAO.java | 67 +++++++++++++---- .../plugin/banyandb/BanyanDBConfigLoader.java | 68 ++++++++++++++++- .../plugin/banyandb/BanyanDBIndexInstaller.java | 54 +++++--------- .../plugin/banyandb/BanyanDBStorageConfig.java | 56 ++++++++++++++ .../storage/plugin/banyandb/MetadataRegistry.java | 86 ++++++++++++++-------- .../banyandb/stream/AbstractBanyanDBDAO.java | 15 ++-- .../server/storage/plugin/banyandb/BanyanDBIT.java | 2 +- 26 files changed, 462 insertions(+), 191 deletions(-) diff --git a/apm-dist/src/main/assembly/binary.xml b/apm-dist/src/main/assembly/binary.xml index 7848d357dc..0f7a29454d 100644 --- a/apm-dist/src/main/assembly/binary.xml +++ b/apm-dist/src/main/assembly/binary.xml @@ -62,6 +62,7 @@ <include>hierarchy-definition.yml</include> <include>bydb.dependencies.properties</include> <include>bydb.yml</include> + <include>bydb-topn.yml</include> <include>oal/*.oal</include> <include>fetcher-prom-rules/*.yaml</include> <include>envoy-metrics-rules/**</include> diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 6d53e69b08..22107feab8 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -12,7 +12,7 @@ * PromQL Service: traffic query support `limit` and regex match. * Fix an edge case of HashCodeSelector(Integer#MIN_VALUE causes ArrayIndexOutOfBoundsException). * Support Flink monitoring. -* BanyanDB: Support `@ShardingKey` for Measure tags and set to TopNAggregation group tag by default. +* BanyanDB: Support `@ShardingKey` for Measure tags. * BanyanDB: Support cold stage data query for metrics/traces/logs. * Increase the idle check interval of the message queue to 200ms to reduce CPU usage under low load conditions. * Limit max attempts of DNS resolution of Istio ServiceEntry to 3, and do not wait for first resolution result in case the DNS is not resolvable at all. @@ -31,6 +31,7 @@ * Fix `disable.oal` does not work. * Enhance the stability of e2e PHP tests and update the PHP agent version. * Add component ID for the `dameng` JDBC driver. +* BanyanDB: Support custom `TopN pre-aggregation` rules configuration in file `bydb-topn.yml`. #### UI diff --git a/docs/en/setup/backend/storages/banyandb.md b/docs/en/setup/backend/storages/banyandb.md index 0dbbcee375..5624f11252 100644 --- a/docs/en/setup/backend/storages/banyandb.md +++ b/docs/en/setup/backend/storages/banyandb.md @@ -233,6 +233,88 @@ groups: shardNum: ${SW_STORAGE_BANYANDB_PROPERTY_SHARD_NUM:1} ``` +### TopN Rules Configuration +The BanyanDB storage supports TopN pre-aggregation in the BanyanDB server side, which trades off more disk_volume for pre-aggregation to save CPU cost, and perform faster query in the query stage. +You can define the TopN rules for different metrics. The configuration is defined in the `bydb-topn.yaml` file: + +```yaml +# This file is used to configure the TopN rules for BanyanDB in SkyWalking OAP server. +# The rules define how to aggregate and sort `metrics (Measure)` for services, endpoints, and instances. +# +# - name: Required. The name of the TopN rule, uniquely identifies the rule. +# - metricName: Required. The name of the metric to be aggregated. +# - groupByTagNames: Optional, default `[]`. The tag names to group the metrics by. If not specified, the metrics will sort without grouped. +# - countersNumber: Optional, default `1000`. The max size of entries in a time window for the pre-aggregation. + +# The size of LRU determines the maximally tolerated time range. +# The buffers in the time range are kept in the memory so that +# the data in [T - lruSize * n, T] would be accepted in the pre-aggregation process. +# T = the current time in the current dimensionality. +# n = interval in the current dimensionality. +# - lruSizeMinute: Optional, default `10`. Defines how many time_buckets are held in the memory for minute-level metrics. +# - lruSizeHourDay: Optional, default `2`. Defines how many time_buckets are held in the memory for hour and day-level metrics. + +# - sort: Optional, default `all`. The sorting order for the metrics, asc, des or all(include both asc and des). + +TopN-Rules: + # endpoint metrics + # `attr0` is defined in the `EndpointDecorator` as the Layer. + - name: endpoint_cpm + metricName: endpoint_cpm + sort: des + - name: endpoint_cpm-layer + metricName: endpoint_cpm + groupByTagNames: + - attr0 + sort: des + - name: endpoint_cpm-service + metricName: endpoint_cpm + groupByTagNames: + - service_id + sort: des + - name: endpoint_sla + metricName: endpoint_sla + sort: asc + - name: endpoint_sla-layer + metricName: endpoint_sla + groupByTagNames: + - attr0 + sort: asc + - name: endpoint_sla-service + metricName: endpoint_sla + groupByTagNames: + - service_id + sort: asc + - name: endpoint_resp_time + metricName: endpoint_resp_time + sort: des + - name: endpoint_resp_time-layer + metricName: endpoint_resp_time + groupByTagNames: + - attr0 + sort: des + - name: endpoint_resp_time-service + metricName: endpoint_resp_time + groupByTagNames: + - service_id + sort: des + # browser_app_page_pv metrics + - name: browser_app_page_pv-service + metricName: browser_app_page_pv + groupByTagNames: + - service_id + sort: des + - name: browser_app_page_error_sum-service + metricName: browser_app_page_error_sum + groupByTagNames: + - service_id + sort: des + - name: browser_app_page_error_rate-service + metricName: browser_app_page_error_rate + groupByTagNames: + - service_id + sort: des +``` ### Installation Modes diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java index c76ac3f803..479c9bafd3 100644 --- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java +++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumn.java @@ -34,11 +34,11 @@ public class SourceColumn { private int length; private String fieldSetter; private String fieldGetter; - private final boolean groupByCondInTopN; + private final int shardingKeyIdx; private final boolean attribute; public SourceColumn(String fieldName, String columnName, Class<?> type, boolean isID, int length, - boolean groupByCondInTopN, boolean attribute) { + int shardingKeyIdx, boolean attribute) { this.fieldName = fieldName; this.columnName = columnName; this.type = type; @@ -48,7 +48,7 @@ public class SourceColumn { this.fieldGetter = ClassMethodUtil.toGetMethod(fieldName); this.fieldSetter = ClassMethodUtil.toSetMethod(fieldName); - this.groupByCondInTopN = groupByCondInTopN; + this.shardingKeyIdx = shardingKeyIdx; this.attribute = attribute; } @@ -82,9 +82,16 @@ public class SourceColumn { this.typeName = typeName; } + /** + * @return true if this column is a part of sharding key + */ + public boolean isShardingKey() { + return this.shardingKeyIdx > -1; + } + @Override public String toString() { - return "SourceColumn{" + "fieldName='" + fieldName + '\'' + ", columnName='" + columnName + '\'' + ", type=" + type + ", isID=" + isID + '}'; + return "SourceColumn{" + "fieldName=" + fieldName + ", columnName=" + columnName + ", type=" + type + ", isID=" + isID + ", shardingKeyIdx=" + shardingKeyIdx + ", isAttribute=" + attribute + "}"; } @Override diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java index dadcbee205..494a1b650c 100644 --- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java +++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/parser/SourceColumnsFactory.java @@ -31,7 +31,7 @@ public class SourceColumnsFactory { for (ScopeDefaultColumn defaultColumn : columns) { sourceColumns.add( new SourceColumn(defaultColumn.getFieldName(), defaultColumn.getColumnName(), defaultColumn - .getType(), defaultColumn.isID(), defaultColumn.getLength(), defaultColumn.isGroupByCondInTopN(), defaultColumn.isAttribute())); + .getType(), defaultColumn.isID(), defaultColumn.getLength(), defaultColumn.getShardingKeyIdx(), defaultColumn.isAttribute())); } return sourceColumns; } diff --git a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/util/OALClassGenerator.java b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/util/OALClassGenerator.java index f53dfcc5b4..da35514c0a 100644 --- a/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/util/OALClassGenerator.java +++ b/oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/util/OALClassGenerator.java @@ -211,13 +211,9 @@ public class OALClassGenerator { final var enableDocValuesAnnotation = new Annotation(ElasticSearch.EnableDocValues.class.getName(), constPool); annotationsAttribute.addAnnotation(enableDocValuesAnnotation); } - - if (field.isGroupByCondInTopN()) { - Annotation banyanTopNAggregationAnnotation = new Annotation(BanyanDB.TopNAggregation.class.getName(), constPool); - annotationsAttribute.addAnnotation(banyanTopNAggregationAnnotation); - // If TopN, add ShardingKey to group field. + if (field.isShardingKey()) { Annotation banyanShardingKeyAnnotation = new Annotation(BanyanDB.ShardingKey.class.getName(), constPool); - banyanShardingKeyAnnotation.addMemberValue("index", new IntegerMemberValue(constPool, 0)); + banyanShardingKeyAnnotation.addMemberValue("index", new IntegerMemberValue(constPool, field.getShardingKeyIdx())); annotationsAttribute.addAnnotation(banyanShardingKeyAnnotation); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java index adab7546cc..c9ded0962a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPagePerf.java @@ -41,7 +41,6 @@ public class BrowserAppPagePerf extends BrowserAppPerfSource { @Getter @ScopeDefaultColumn.DefinedByField(columnName = "service_id") - @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true) private String serviceId; @Getter @Setter diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPageTraffic.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPageTraffic.java index 1f279c3c13..745d9c008d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPageTraffic.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppPageTraffic.java @@ -41,6 +41,7 @@ public class BrowserAppPageTraffic extends BrowserAppTrafficSource { @Getter @ScopeDefaultColumn.DefinedByField(columnName = "service_id") + @ScopeDefaultColumn.BanyanDB(shardingKeyIdx = 0) private String serviceId; @Getter @Setter diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppResourcePerf.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppResourcePerf.java index cda18af2e0..32894d24e2 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppResourcePerf.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppResourcePerf.java @@ -45,7 +45,6 @@ public class BrowserAppResourcePerf extends Source { } @ScopeDefaultColumn.DefinedByField(columnName = "service_id") - @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true) private String serviceId; @ScopeDefaultColumn.DefinedByField(columnName = "service_name", requireDynamicActive = true) private String serviceName; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebInteractionPerf.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebInteractionPerf.java index 50a6f84cee..2a303e33d1 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebInteractionPerf.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebInteractionPerf.java @@ -44,7 +44,6 @@ public class BrowserAppWebInteractionPerf extends Source { } @ScopeDefaultColumn.DefinedByField(columnName = "service_id") - @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true) private String serviceId; @ScopeDefaultColumn.DefinedByField(columnName = "service_name", requireDynamicActive = true) private String serviceName; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebVitalsPerf.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebVitalsPerf.java index 95c003596c..c7df9271ad 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebVitalsPerf.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/browser/source/BrowserAppWebVitalsPerf.java @@ -45,7 +45,6 @@ public class BrowserAppWebVitalsPerf extends Source { } @ScopeDefaultColumn.DefinedByField(columnName = "service_id") - @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true) private String serviceId; @ScopeDefaultColumn.DefinedByField(columnName = "service_name", requireDynamicActive = true) private String serviceName; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java index 13bb43524d..1d51cc193e 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java @@ -232,7 +232,7 @@ public class DefaultScopeDefine { if (virtualColumn != null) { scopeDefaultColumns.add( new ScopeDefaultColumn(virtualColumn.fieldName(), virtualColumn.columnName(), virtualColumn - .type(), virtualColumn.isID(), virtualColumn.length(), false, false)); + .type(), virtualColumn.isID(), virtualColumn.length(), -1, false)); } Field[] scopeClassField = originalClass.getDeclaredFields(); if (scopeClassField != null) { @@ -241,16 +241,16 @@ public class DefaultScopeDefine { ScopeDefaultColumn.DefinedByField.class); ScopeDefaultColumn.BanyanDB banyanDB = field.getAnnotation( ScopeDefaultColumn.BanyanDB.class); - boolean groupByCondInTopN = false; + int shardingKeyIdx = -1; if (banyanDB != null) { - groupByCondInTopN = banyanDB.groupByCondInTopN(); + shardingKeyIdx = banyanDB.shardingKeyIdx(); } if (definedByField != null) { if (!definedByField.requireDynamicActive() || ACTIVE_EXTRA_MODEL_COLUMNS) { scopeDefaultColumns.add( new ScopeDefaultColumn( field.getName(), definedByField.columnName(), field.getType(), false, - definedByField.length(), groupByCondInTopN, definedByField.isAttribute() + definedByField.length(), shardingKeyIdx, definedByField.isAttribute() )); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java index acc5955f58..d1135a7640 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Endpoint.java @@ -57,7 +57,7 @@ public class Endpoint extends Source { private String name; @Getter @ScopeDefaultColumn.DefinedByField(columnName = "service_id") - @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true) + @ScopeDefaultColumn.BanyanDB(shardingKeyIdx = 0) private String serviceId; @Getter @Setter diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/K8SEndpoint.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/K8SEndpoint.java index 0f16aaa02d..f97f83e72b 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/K8SEndpoint.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/K8SEndpoint.java @@ -32,7 +32,7 @@ public class K8SEndpoint extends K8SMetrics.ProtocolMetrics { private volatile String entityId; @ScopeDefaultColumn.DefinedByField(columnName = "service_id") - @ScopeDefaultColumn.BanyanDB(groupByCondInTopN = true) + @ScopeDefaultColumn.BanyanDB(shardingKeyIdx = 0) private String serviceId; @ScopeDefaultColumn.DefinedByField(columnName = "service_name", requireDynamicActive = true) private String serviceName; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java index 7688668087..f96f056107 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ScopeDefaultColumn.java @@ -24,6 +24,7 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB; /** * Define the default columns of source scope. These columns pass down into the persistent entity(OAL metrics entity) @@ -36,16 +37,16 @@ public class ScopeDefaultColumn { private Class<?> type; private boolean isID; private int length; - private final boolean groupByCondInTopN; + private final int shardingKeyIdx; private final boolean attribute; - public ScopeDefaultColumn(String fieldName, String columnName, Class<?> type, boolean isID, int length, boolean groupByCondInTopN, boolean attribute) { + public ScopeDefaultColumn(String fieldName, String columnName, Class<?> type, boolean isID, int length, int shardingKeyIdx, boolean attribute) { this.fieldName = fieldName; this.columnName = columnName; this.type = type; this.isID = isID; this.length = length; - this.groupByCondInTopN = groupByCondInTopN; + this.shardingKeyIdx = shardingKeyIdx; this.attribute = attribute; } @@ -80,12 +81,17 @@ public class ScopeDefaultColumn { @Retention(RetentionPolicy.RUNTIME) public @interface BanyanDB { /** - * Indicate whether this column is a condition for groupBy in the TopN Aggregation. + * ShardingKey is used to group time series data per metric in one place. Optional. Only support Measure Tag. + * If ShardingKey is not set, the default ShardingKey is based on the combination of 'name' and 'entity' according to the {@link org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB.SeriesID}. + * <p> + * The typical scenario to specify the ShardingKey to the Group tag when the metric generate a TopNAggregation: + * If not set, the default data distribution based on the combination of 'name' and 'entity', can lead to performance issues when calculating the 'TopNAggregation'. + * This is because each shard only has a subset of the top-n list, and the query process has to be responsible for aggregating those lists to obtain the final result. + * This introduces overhead in terms of querying performance and disk usage. * - * @since 9.5.0 - * @since 10.2.0 moved out from {@link DefinedByField} to {@link BanyanDB} + * @since 10.3.0 */ - boolean groupByCondInTopN() default false; + int shardingKeyIdx() default -1; } @Target({ElementType.TYPE}) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java index 41d702165c..8c6afc4771 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java @@ -19,7 +19,6 @@ package org.apache.skywalking.oap.server.core.storage.annotation; import java.lang.annotation.ElementType; -import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @@ -179,31 +178,6 @@ public @interface BanyanDB { @interface StoreIDAsTag { } - /** - * Generate a TopN Aggregation and use the annotated column as a groupBy tag. - * It also contains parameters for TopNAggregation - * - * @since 9.4.0 - */ - @Target({ElementType.FIELD}) - @Retention(RetentionPolicy.RUNTIME) - @Inherited - @interface TopNAggregation { - /** - * The size of LRU determines the maximally tolerated time range. - * The buffers in the time range are kept in the memory so that - * the data in [T - lruSize * n, T] would be accepted in the pre-aggregation process. - * T = the current time in the current dimensionality. - * n = interval in the current dimensionality. - */ - int lruSize() default 2; - - /** - * The max size of entries in a time window for the pre-aggregation. - */ - int countersNumber() default 1000; - } - /** * Match query is designed for BanyanDB match query with specific analyzer. It is a fuzzy query implementation * powered by analyzer. diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java index 307359755c..7adecdff90 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBModelExtension.java @@ -23,8 +23,6 @@ import lombok.Setter; import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB; -import java.util.List; - /** * BanyanDBExtension represents extra metadata for models, but specific for BanyanDB usages. * @@ -49,10 +47,6 @@ public class BanyanDBModelExtension { @Setter private boolean storeIDTag; - @Getter - @Setter - private TopN topN; - /** * indexMode indicates whether a metric is in the index mode. * @@ -62,34 +56,7 @@ public class BanyanDBModelExtension { @Setter private boolean indexMode; - public static class TopN { - /** - * lru_size defines how many time_buckets are held in the memory. - * For example, "2" means data points belonging to the latest "2" time_buckets will be persisted. - * The default value is 2 in the BanyanDB if not set. - * - * @since 9.4.0 - */ - @Getter - @Setter - private int lruSize; - - /** - * counters_number defines the max number of entries to be tracked during the pre-aggregation. - * The default value is 1000 in the BanyanDB if not set. - * - * @since 9.4.0 - */ - @Getter - @Setter - private int countersNumber; - - @Setter - @Getter - private List<String> groupByTagNames; - } - - @Setter - @Getter - private BanyanDB.StreamGroup streamGroup = BanyanDB.StreamGroup.RECORDS; + @Setter + @Getter + private BanyanDB.StreamGroup streamGroup = BanyanDB.StreamGroup.RECORDS; } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java index 059f989367..df85a92ed7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java @@ -231,8 +231,6 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula BanyanDB.IndexRule.class); final BanyanDB.MeasureField banyanDBMeasureField = field.getAnnotation( BanyanDB.MeasureField.class); - final BanyanDB.TopNAggregation topNAggregation = field.getAnnotation( - BanyanDB.TopNAggregation.class); final BanyanDB.MatchQuery analyzer = field.getAnnotation( BanyanDB.MatchQuery.class); final BanyanDB.EnableSort enableSort = field.getAnnotation( @@ -248,14 +246,6 @@ public class StorageModels implements IModelManager, ModelCreator, ModelManipula enableSort != null ); - if (topNAggregation != null) { - BanyanDBModelExtension.TopN topN = new BanyanDBModelExtension.TopN(); - topN.setLruSize(topNAggregation.lruSize()); - topN.setCountersNumber(topNAggregation.countersNumber()); - topN.setGroupByTagNames(Collections.singletonList(column.name())); - banyanDBModelExtension.setTopN(topN); - } - final ModelColumn modelColumn = new ModelColumn( new ColumnName(column), field.getType(), diff --git a/oap-server/server-starter/src/main/resources/bydb-topn.yml b/oap-server/server-starter/src/main/resources/bydb-topn.yml new file mode 100644 index 0000000000..c20a87f41f --- /dev/null +++ b/oap-server/server-starter/src/main/resources/bydb-topn.yml @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file is used to configure the TopN rules for BanyanDB in SkyWalking OAP server. +# The rules define how to aggregate and sort `metrics (Measure)` for services, endpoints, and instances. +# +# - name: Required. The name of the TopN rule, uniquely identifies the rule. +# - metricName: Required. The name of the metric to be aggregated. +# - groupByTagNames: Optional, default `[]`. The tag names to group the metrics by. If not specified, the metrics will sort without grouped. +# - countersNumber: Optional, default `1000`. The max size of entries in a time window for the pre-aggregation. + +# The size of LRU determines the maximally tolerated time range. +# The buffers in the time range are kept in the memory so that +# the data in [T - lruSize * n, T] would be accepted in the pre-aggregation process. +# T = the current time in the current dimensionality. +# n = interval in the current dimensionality. +# - lruSizeMinute: Optional, default `10`. Defines how many time_buckets are held in the memory for minute-level metrics. +# - lruSizeHourDay: Optional, default `2`. Defines how many time_buckets are held in the memory for hour and day-level metrics. + +# - sort: Optional, default `all`. The sorting order for the metrics, asc, des or all(include both asc and des). + +TopN-Rules: + # endpoint metrics + # `attr0` is defined in the `EndpointDecorator` as the Layer. + - name: endpoint_cpm-layer + metricName: endpoint_cpm + groupByTagNames: + - attr0 + sort: des + - name: endpoint_cpm-service + metricName: endpoint_cpm + groupByTagNames: + - service_id + sort: des + - name: endpoint_sla-layer + metricName: endpoint_sla + groupByTagNames: + - attr0 + sort: asc + - name: endpoint_sla-service + metricName: endpoint_sla + groupByTagNames: + - service_id + sort: asc + - name: endpoint_resp_time-layer + metricName: endpoint_resp_time + groupByTagNames: + - attr0 + sort: des + - name: endpoint_resp_time-service + metricName: endpoint_resp_time + groupByTagNames: + - service_id + sort: des + # browser_app_page_pv metrics + - name: browser_app_page_pv-service + metricName: browser_app_page_pv + groupByTagNames: + - service_id + sort: des + - name: browser_app_page_error_sum-service + metricName: browser_app_page_error_sum + groupByTagNames: + - service_id + sort: des + - name: browser_app_page_error_rate-service + metricName: browser_app_page_error_rate + groupByTagNames: + - service_id + sort: des \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java index 0fd25a9fa8..3781e5c667 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java @@ -19,6 +19,8 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import com.google.common.collect.ImmutableSet; +import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase; +import org.apache.skywalking.banyandb.model.v1.BanyandbModel; import org.apache.skywalking.banyandb.v1.client.AbstractQuery; import org.apache.skywalking.banyandb.v1.client.DataPoint; import org.apache.skywalking.banyandb.v1.client.MeasureQuery; @@ -27,6 +29,7 @@ import org.apache.skywalking.banyandb.v1.client.TimestampRange; import org.apache.skywalking.banyandb.v1.client.TopNQueryResponse; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.query.enumeration.Order; +import org.apache.skywalking.oap.server.core.query.input.AttrCondition; import org.apache.skywalking.oap.server.core.query.input.Duration; import org.apache.skywalking.oap.server.core.query.input.TopNCondition; import org.apache.skywalking.oap.server.core.query.type.KeyValue; @@ -41,7 +44,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements IAggregationQueryDAO { private static final Set<String> TAGS = ImmutableSet.of(Metrics.ENTITY_ID); @@ -65,13 +67,52 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements } // BanyanDB server-side TopN support for metrics pre-aggregation. - if (schema.getTopNSpec() != null && CollectionUtils.isEmpty(condition.getAttributes())) { - // 1) no additional conditions - // 2) additional conditions are all group by tags - if (CollectionUtils.isEmpty(additionalConditions) || - additionalConditions.stream().map(KeyValue::getKey).collect(Collectors.toSet()) - .equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNamesList()))) { - return serverSideTopN(isColdStage, condition, schema, spec, getTimestampRange(duration), additionalConditions); + // The query tags are the additional conditions and attributes defined in the TopN condition. + // The query tags is the key to find the TopN aggregation in the schema. + // If the TopN aggregation is defined in the schema, it will be used to perform the query. + // The server-side TopN only support when attribute condition `isEquals == true`. + ImmutableSet.Builder<String> queryTags = ImmutableSet.builder(); + boolean equalsQuery = true; + if (condition.getAttributes() != null) { + for (AttrCondition attr : condition.getAttributes()) { + if (!attr.isEquals()) { + equalsQuery = false; + break; + } + queryTags.add(attr.getKey()); + } + } + if (!equalsQuery) { + return directMetricsTopN(isColdStage, condition, schema, valueColumnName, spec, getTimestampRange(duration), additionalConditions); + } + if (additionalConditions != null) { + additionalConditions.forEach(additionalCondition -> queryTags.add(additionalCondition.getKey())); + } + if (schema.getTopNSpecs() != null) { + BanyandbDatabase.TopNAggregation topNAggregation = schema.getTopNSpecs().get(queryTags.build()); + if (topNAggregation != null) { + BanyandbModel.Sort sort = topNAggregation.getFieldValueSort(); + // If the TopN aggregation is defined in the schema, use it. + switch (condition.getOrder()) { + case DES: + if (sort == BanyandbModel.Sort.SORT_DESC || sort == BanyandbModel.Sort.SORT_UNSPECIFIED) { + return serverSideTopN( + isColdStage, condition, schema, spec, getTimestampRange(duration), additionalConditions, + topNAggregation.getMetadata().getName(), AbstractQuery.Sort.DESC + ); + } + break; + case ASC: + if (sort == BanyandbModel.Sort.SORT_ASC || sort == BanyandbModel.Sort.SORT_UNSPECIFIED) { + return serverSideTopN( + isColdStage, condition, schema, spec, getTimestampRange(duration), additionalConditions, + topNAggregation.getMetadata().getName(), AbstractQuery.Sort.ASC + ); + } + break; + default: + throw new IOException("Unsupported order: " + condition.getOrder()); + } } } @@ -79,13 +120,9 @@ public class BanyanDBAggregationQueryDAO extends AbstractBanyanDBDAO implements } List<SelectedRecord> serverSideTopN(boolean isColdStage, TopNCondition condition, MetadataRegistry.Schema schema, MetadataRegistry.ColumnSpec valueColumnSpec, - TimestampRange timestampRange, List<KeyValue> additionalConditions) throws IOException { - TopNQueryResponse resp = null; - if (condition.getOrder() == Order.DES) { - resp = topNQueryDebuggable(isColdStage, schema, timestampRange, condition.getTopN(), AbstractQuery.Sort.DESC, additionalConditions, condition.getAttributes()); - } else { - resp = topNQueryDebuggable(isColdStage, schema, timestampRange, condition.getTopN(), AbstractQuery.Sort.ASC, additionalConditions, condition.getAttributes()); - } + TimestampRange timestampRange, List<KeyValue> additionalConditions, String topNRuleName, AbstractQuery.Sort sort) throws IOException { + TopNQueryResponse resp; + resp = topNQueryDebuggable(isColdStage, schema, timestampRange, condition.getTopN(), sort, additionalConditions, condition.getAttributes(), topNRuleName); if (resp.size() == 0) { return Collections.emptyList(); } else if (resp.size() > 1) { // since we have done aggregation, i.e. MEAN diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConfigLoader.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConfigLoader.java index ea4f931003..0104aa1177 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConfigLoader.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConfigLoader.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import java.io.FileNotFoundException; import java.io.Reader; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import lombok.extern.slf4j.Slf4j; @@ -28,6 +29,7 @@ import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.util.ResourceUtils; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageConfig.TopN; import org.yaml.snakeyaml.Yaml; import static org.apache.skywalking.oap.server.library.util.YamlConfigLoaderUtils.copyProperties; @@ -36,16 +38,25 @@ import static org.apache.skywalking.oap.server.library.util.YamlConfigLoaderUtil @Slf4j public class BanyanDBConfigLoader { private final ModuleProvider moduleProvider; + private BanyanDBStorageConfig config; private final Yaml yaml; + private final Yaml topNYaml; public BanyanDBConfigLoader(final ModuleProvider moduleProvider) { this.moduleProvider = moduleProvider; + this.config = new BanyanDBStorageConfig(); this.yaml = new Yaml(); + this.topNYaml = new Yaml(); } public BanyanDBStorageConfig loadConfig() throws ModuleStartException { - BanyanDBStorageConfig config = new BanyanDBStorageConfig(); - Reader applicationReader = null; + loadBaseConfig(); + loadTopNConfig(); + return config; + } + + private void loadBaseConfig() throws ModuleStartException { + Reader applicationReader; try { applicationReader = ResourceUtils.read("bydb.yml"); } catch (FileNotFoundException e) { @@ -53,7 +64,7 @@ public class BanyanDBConfigLoader { } Map<String, Map<String, ?>> configMap = yaml.loadAs(applicationReader, Map.class); if (configMap == null) { - return config; + return; } Map<String, Properties> configProperties = new HashMap<>(); @@ -137,7 +148,6 @@ public class BanyanDBConfigLoader { } catch (IllegalAccessException e) { throw new ModuleStartException("Failed to load BanyanDB configuration.", e); } - return config; } private Properties parseConfig(final Map<String, ?> config) { @@ -181,4 +191,54 @@ public class BanyanDBConfigLoader { groupResource.getAdditionalLifecycleStages().add(cold); } } + + private void loadTopNConfig() throws ModuleStartException { + Reader applicationReader; + try { + applicationReader = ResourceUtils.read("bydb-topn.yml"); + } catch (FileNotFoundException e) { + throw new ModuleStartException("Cannot find the BanyanDB topN configuration file [bydb-topn.yml].", e); + } + Map<String, List<Map<String, ?>>> configMap = yaml.loadAs(applicationReader, Map.class); + if (configMap == null) { + return; + } + List<Map<String, ?>> topNConfig = configMap.get("TopN-Rules"); + for (Map<String, ?> rule : topNConfig) { + TopN topN = new TopN(); + var name = rule.get("name"); + if (name == null) { + throw new ModuleStartException("TopN rule name is missing in file [bydb-topn.yml]."); + } + var metricName = rule.get("metricName"); + if (metricName == null) { + throw new ModuleStartException("TopN rule metricName is missing in file [bydb-topn.yml]."); + } + topN.setName(name.toString()); + var groupByTagNames = rule.get("groupByTagNames"); + if (groupByTagNames != null) { + topN.setGroupByTagNames((List<String>) groupByTagNames); + } + var countersNumber = rule.get("countersNumber"); + if (countersNumber != null) { + topN.setLruSizeMinute((int) countersNumber); + } + var lruSizeMinute = rule.get("lruSizeMinute"); + if (lruSizeMinute != null) { + topN.setLruSizeMinute((int) lruSizeMinute); + } + var lruSizeHourDay = rule.get("lruSizeHourDay"); + if (lruSizeHourDay != null) { + topN.setLruSizeMinute((int) lruSizeHourDay); + } + var sort = rule.get("sort"); + if (sort != null) { + topN.setSort(TopN.Sort.valueOf(sort.toString())); + } + Map<String, TopN> map = config.getTopNConfigs().computeIfAbsent(metricName.toString(), k -> new HashMap<>()); + if (map.put(name.toString(), topN) != null) { + throw new ModuleStartException("Duplicate TopN rule name: " + name + " in file [bydb-topn.yml]."); + } + } + } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index 90bebd4072..ba6ccf0450 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -195,18 +195,7 @@ public class BanyanDBIndexInstaller extends ModelInstaller { } } final MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); - try { - defineTopNAggregation(schema, client); - } catch (BanyanDBException ex) { - if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { - log.info("Measure schema {}_{} TopN({}) already created by another OAP node", - model.getName(), - model.getDownsampling(), - schema.getTopNSpec()); - } else { - throw ex; - } - } + defineTopNAggregation(schema, client); } } } else { @@ -376,20 +365,22 @@ public class BanyanDBIndexInstaller extends ModelInstaller { } private void defineTopNAggregation(MetadataRegistry.Schema schema, BanyanDBClient client) throws BanyanDBException { - if (schema.getTopNSpec() == null) { + if (CollectionUtils.isEmpty(schema.getTopNSpecs())) { if (schema.getMetadata().getKind() == MetadataRegistry.Kind.MEASURE) { log.debug("skip null TopN Schema for [{}]", schema.getMetadata().name()); } return; } - try { - client.define(schema.getTopNSpec()); - log.info("installed TopN schema for measure {}", schema.getMetadata().name()); - } catch (BanyanDBException ex) { - if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { - log.info("TopNAggregation {} already created by another OAP node", schema.getTopNSpec()); - } else { - throw ex; + for (TopNAggregation topNSpec : schema.getTopNSpecs().values()) { + try { + client.define(topNSpec); + log.info("installed TopN schema for measure {}", schema.getMetadata().name()); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("TopNAggregation {} already created by another OAP node", topNSpec); + } else { + throw ex; + } } } } @@ -628,15 +619,16 @@ public class BanyanDBIndexInstaller extends ModelInstaller { /** * Check if the TopN aggregation exists and update it if necessary. - * If the old TopN aggregation is not in the schema, delete it. + * todo:// can not delete TopN here now. */ private void checkTopNAggregation(Model model, BanyanDBClient client) throws BanyanDBException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); - String topNName = MetadataRegistry.Schema.formatTopNName(schema.getMetadata().name()); - TopNAggregation hisTopNAggregation = client.findTopNAggregation(schema.getMetadata().getGroup(), topNName); - - if (schema.getTopNSpec() != null) { - TopNAggregation topNAggregation = schema.getTopNSpec(); + if (schema.getTopNSpecs() == null) { + return; + } + for (TopNAggregation topNAggregation : schema.getTopNSpecs().values()) { + String topNName = topNAggregation.getMetadata().getName(); + TopNAggregation hisTopNAggregation = client.findTopNAggregation(schema.getMetadata().getGroup(), topNName); if (hisTopNAggregation == null) { try { client.define(topNAggregation); @@ -662,14 +654,6 @@ public class BanyanDBIndexInstaller extends ModelInstaller { ); } } - } else { - if (hisTopNAggregation != null) { - client.deleteTopNAggregation(schema.getMetadata().getGroup(), topNName); - log.info( - "delete deprecated TopNAggregation: {} from group: {}", hisTopNAggregation.getMetadata().getName(), - schema.getMetadata().getGroup() - ); - } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java index ea7944371b..e437841883 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java @@ -21,9 +21,13 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import lombok.Data; import lombok.Getter; import lombok.Setter; +import org.apache.skywalking.banyandb.model.v1.BanyandbModel; import org.apache.skywalking.oap.server.library.module.ModuleConfig; @Getter @@ -42,6 +46,8 @@ public class BanyanDBStorageConfig extends ModuleConfig { private Metadata metadata = new Metadata(); private Property property = new Property(); + private Map<String/*metric name*/, Map<String, TopN>> topNConfigs = new HashMap<>(); + public String[] getTargetArray() { return Iterables.toArray( Splitter.on(",").omitEmptyStrings().trimResults().split(this.global.targets), String.class); @@ -215,4 +221,54 @@ public class BanyanDBStorageConfig extends ModuleConfig { @Setter public static class Property extends BanyanDBStorageConfig.GroupResource { } + + @Data + public static class TopN { + private String name; + /** + * The size of LRU determines the maximally tolerated time range. + * The buffers in the time range are kept in the memory so that + * the data in [T - lruSize * n, T] would be accepted in the pre-aggregation process. + * T = the current time in the current dimensionality. + * n = interval in the current dimensionality. + * lruSizeMinute defines how many time_buckets are held in the memory for minute-level metrics. + * For example, "10" means data points belonging to the latest "10" time_buckets will be persisted. + */ + private int lruSizeMinute = 10; + /** + * lruSizeHourDay defines how many time_buckets are held in the memory for hour and day for minute-level metrics. + * For example, "2" means data points belonging to the latest "2" time_buckets will be persisted. + */ + private int lruSizeHourDay = 2; + + /** + * counters_number defines max size of entries in a time window for the pre-aggregation. + */ + private int countersNumber = 1000; + + /** + * groupByTagNames defines the tags to be used for grouping the TopN results. + * If not set, the default is empty, meaning no grouping. + */ + private List<String> groupByTagNames; + + /** + * sort defines the sorting order of the TopN results. + * Default is "all", which means include `des and asc`. + */ + private Sort sort = Sort.all; + + public enum Sort { + all(BanyandbModel.Sort.SORT_UNSPECIFIED), + des(BanyandbModel.Sort.SORT_DESC), + asc(BanyandbModel.Sort.SORT_ASC); + + @Getter + private final BanyandbModel.Sort banyandbSort; + + Sort(final BanyandbModel.Sort sort) { + this.banyandbSort = sort; + } + } + } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 0d922cfc99..513dc3106e 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; +import com.google.common.collect.ImmutableSet; import com.google.gson.JsonObject; import lombok.Builder; import lombok.Data; @@ -42,7 +43,6 @@ import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagFamilySpec import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagSpec; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation; -import org.apache.skywalking.banyandb.model.v1.BanyandbModel; import org.apache.skywalking.banyandb.v1.client.metadata.Duration; import org.apache.skywalking.oap.server.core.analysis.DownSampling; import org.apache.skywalking.oap.server.core.analysis.metrics.IntList; @@ -142,11 +142,12 @@ public enum MetadataRegistry { MeasureMetadata tagsAndFields = parseTagAndFieldMetadata(model, schemaBuilder, seriesIDColumns, schemaMetadata.group); List<TagFamilySpec> tagFamilySpecs = schemaMetadata.extractTagFamilySpec(tagsAndFields.tags, model.getBanyanDBModelExtension().isStoreIDTag()); // iterate over tagFamilySpecs to save tag names - for (final TagFamilySpec tagFamilySpec : tagFamilySpecs) { - for (final TagSpec tagSpec : tagFamilySpec.getTagsList()) { - schemaBuilder.tag(tagSpec.getName()); - } - } + Set<String> tags = tagFamilySpecs.stream() + .flatMap(tagFamilySpec -> tagFamilySpec.getTagsList().stream()) + + .map(TagSpec::getName) + .collect(Collectors.toSet()); + schemaBuilder.tags(tags); List<IndexRule> indexRules = tagsAndFields.tags.stream() .map(TagMetadata::getIndexRule) .filter(Objects::nonNull) @@ -177,8 +178,11 @@ public enum MetadataRegistry { schemaBuilder.field(field.getName()); } // parse TopN - schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.group, schemaMetadata.name())); - + schemaBuilder.topNSpecs(parseTopNSpecs( + model, schemaMetadata.group, schemaMetadata.name(), + config.getTopNConfigs().get(model.getName()), + tags + )); registry.put(schemaMetadata.name(), schemaBuilder.build()); return new MeasureModel(builder.build(), indexRules); } @@ -196,9 +200,12 @@ public enum MetadataRegistry { return new PropertyModel(builder.build()); } - private TopNAggregation parseTopNSpec(final Model model, final String group, final String measureName) - throws StorageException { - if (model.getBanyanDBModelExtension().getTopN() == null) { + private Map<ImmutableSet<String>, TopNAggregation> parseTopNSpecs(final Model model, + final String group, + final String measureName, + final Map<String, BanyanDBStorageConfig.TopN> topNConfig, + final Set<String> tags) throws StorageException { + if (topNConfig == null) { return null; } @@ -207,20 +214,45 @@ public enum MetadataRegistry { // skip non-single valued metrics return null; } + Map<ImmutableSet<String>, TopNAggregation> topNAggregations = new HashMap<>(); + topNConfig.forEach((name, topN) -> { + ImmutableSet<String> key = ImmutableSet.of(); + TopNAggregation.Builder topNAggregation = TopNAggregation.newBuilder() + .setMetadata( + Metadata.newBuilder().setGroup(group).setName(name)) + .setSourceMeasure(Metadata.newBuilder().setGroup(group).setName(measureName)) + .setFieldValueSort(topN.getSort().getBanyandbSort()) + .setFieldName(valueColumnOpt.get().getValueCName()) + .setCountersNumber(topN.getCountersNumber()); + if (topN.getGroupByTagNames() != null) { + key = ImmutableSet.copyOf(topN.getGroupByTagNames()); + //check tags + topN.getGroupByTagNames().forEach(tag -> { + if (!tags.contains(tag)) { + throw new IllegalArgumentException( + "In file [bydb-topn.yml], TopN rule " + topN.getName() + "'s groupByTagName [" + tag + "] is not defined in metric " + model.getName()); + } + }); + topNAggregation.addAllGroupByTagNames(topN.getGroupByTagNames()); + } - if (CollectionUtils.isEmpty(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())) { - throw new StorageException("invalid groupBy tags: " + model.getBanyanDBModelExtension().getTopN().getGroupByTagNames()); - } - return TopNAggregation.newBuilder() - .setMetadata( - Metadata.newBuilder().setGroup(group).setName(Schema.formatTopNName(measureName))) - .setSourceMeasure(Metadata.newBuilder().setGroup(group).setName(measureName)) - .setFieldValueSort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN and BottomN - .setFieldName(valueColumnOpt.get().getValueCName()) - .addAllGroupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames()) - .setCountersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber()) - .setLruSize(model.getBanyanDBModelExtension().getTopN().getLruSize()) - .build(); + switch (model.getDownsampling()) { + case Minute: + topNAggregation.setLruSize(topN.getLruSizeMinute()); + break; + case Hour: + case Day: + topNAggregation.setLruSize(topN.getLruSizeHourDay()); + break; + default: + throw new UnsupportedOperationException("unsupported downsampling: " + model.getDownsampling()); + } + if (topNAggregations.containsKey(key)) { + throw new IllegalArgumentException("In file [bydb-topn.yml], TopN rule " + topN.getName() + "'s groupByTagNames " + key + " already exist in the same metric " + model.getName()); + } + topNAggregations.put(key, topNAggregation.build()); + }); + return topNAggregations; } public Schema findMetadata(final Model model) { @@ -690,15 +722,11 @@ public enum MetadataRegistry { @Getter @Nullable - private final TopNAggregation topNSpec; + private final Map<ImmutableSet<String>/*groupBy tags*/, TopNAggregation> topNSpecs; public ColumnSpec getSpec(String columnName) { return this.specs.get(columnName); } - - public static String formatTopNName(String measureName) { - return measureName + "_topn"; - } } @RequiredArgsConstructor diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java index 2cdb5b158a..aed0ca700b 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java @@ -19,7 +19,6 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream; import com.google.gson.Gson; -import java.util.Objects; import javax.annotation.Nullable; import org.apache.skywalking.banyandb.model.v1.BanyandbModel; import org.apache.skywalking.banyandb.v1.client.AbstractCriteria; @@ -145,7 +144,8 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli int number, AbstractQuery.Sort sort, List<KeyValue> additionalConditions, - List<AttrCondition> attributes) throws IOException { + List<AttrCondition> attributes, + String topNRuleName) throws IOException { DebuggingTraceContext traceContext = DebuggingTraceContext.TRACE_CONTEXT.get(); DebuggingSpan span = null; try { @@ -155,6 +155,8 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli builder.append("Condition: ") .append("Schema: ") .append(schema) + .append(", TopNRuleName: ") + .append(topNRuleName) .append(", TimestampRange: ") .append(timestampRange) .append(", Number: ") @@ -169,7 +171,7 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli .append(isColdStage); span.setMsg(builder.toString()); } - TopNQueryResponse response = topNQuery(isColdStage, schema, timestampRange, number, sort, additionalConditions, attributes); + TopNQueryResponse response = topNQuery(isColdStage, schema, timestampRange, number, sort, additionalConditions, attributes, topNRuleName); if (traceContext != null && traceContext.isDumpStorageRsp()) { builder.append("\n").append(" Response: ").append(new Gson().toJson(response.getTopNLists())); span.setMsg(builder.toString()); @@ -188,9 +190,10 @@ public abstract class AbstractBanyanDBDAO extends AbstractDAO<BanyanDBStorageCli int number, AbstractQuery.Sort sort, List<KeyValue> additionalConditions, - List<AttrCondition> attributes) throws IOException { - final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), Objects.requireNonNull( - schema.getTopNSpec()).getMetadata().getName(), + List<AttrCondition> attributes, + String topNRuleName) throws IOException { + final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), + topNRuleName, timestampRange, number, sort); q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java index fe78339cbe..d47e265105 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIT.java @@ -280,7 +280,7 @@ public class BanyanDBIT { private static class TestMetric { @Column(name = "service_id") @BanyanDB.SeriesID(index = 0) - @BanyanDB.TopNAggregation + @BanyanDB.ShardingKey(index = 0) private String serviceId; @Column(name = "tag") @BanyanDB.MatchQuery(analyzer = BanyanDB.MatchQuery.AnalyzerType.URL)