This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 544a601d Feature 12893/add topn subcommand (#656)
544a601d is described below
commit 544a601d0c71c0aa5fa259d800854330b1abb800
Author: OmCheeLin <[email protected]>
AuthorDate: Tue Apr 29 15:37:10 2025 +0800
Feature 12893/add topn subcommand (#656)
* [Feature] Add topn subcommand to bydbctl for querying TopN aggregation
results #12893
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/measure/topn.go | 5 +-
bydbctl/internal/cmd/root.go | 2 +-
bydbctl/internal/cmd/topn.go | 163 ++++++++++
bydbctl/internal/cmd/topn_test.go | 360 +++++++++++++++++++++
docs/concept/data-model.md | 2 +-
.../interacting/bydbctl/query/top-n-aggregation.md | 124 +++++++
.../bydbctl/schema/top-n-aggregation.md | 163 ++++++++++
docs/menu.yml | 4 +
8 files changed, 820 insertions(+), 3 deletions(-)
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 48977980..a53624cf 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -454,12 +454,15 @@ func (manager *topNProcessorManager) start(topNSchema
*databasev1.TopNAggregatio
func (manager *topNProcessorManager) removeProcessors(topNSchema
*databasev1.TopNAggregation) []*topNStreamingProcessor {
var processors []*topNStreamingProcessor
+ var newList []*topNStreamingProcessor
for i := range manager.processorList {
if manager.processorList[i].topNSchema.GetMetadata().GetName()
== topNSchema.GetMetadata().GetName() {
processors = append(processors,
manager.processorList[i])
- manager.processorList =
append(manager.processorList[:i], manager.processorList[i+1:]...)
+ } else {
+ newList = append(newList, manager.processorList[i])
}
}
+ manager.processorList = newList
return processors
}
diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go
index 94823ddd..d167c221 100644
--- a/bydbctl/internal/cmd/root.go
+++ b/bydbctl/internal/cmd/root.go
@@ -69,7 +69,7 @@ func RootCmdFlags(command *cobra.Command) {
_ = viper.BindPFlag("addr", command.PersistentFlags().Lookup("addr"))
viper.SetDefault("addr", "http://localhost:17913")
- command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(),
newMeasureCmd(),
+ command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(),
newMeasureCmd(), newTopnCmd(),
newIndexRuleCmd(), newIndexRuleBindingCmd(), newPropertyCmd(),
newHealthCheckCmd(), newAnalyzeCmd())
}
diff --git a/bydbctl/internal/cmd/topn.go b/bydbctl/internal/cmd/topn.go
new file mode 100644
index 00000000..d7fa90a9
--- /dev/null
+++ b/bydbctl/internal/cmd/topn.go
@@ -0,0 +1,163 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+ "fmt"
+
+ "github.com/go-resty/resty/v2"
+ "github.com/spf13/cobra"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/version"
+)
+
+const topnSchemaPath = "/api/v1/topn-agg/schema"
+
+var topnSchemaPathWithParams = topnSchemaPath + pathTemp
+
+func newTopnCmd() *cobra.Command {
+ topnCmd := &cobra.Command{
+ Use: "topn",
+ Version: version.Build(),
+ Short: "Topn operation",
+ }
+
+ // e.g. http://127.0.0.1:17913/api/v1/topn-agg/schema
+ createCmd := &cobra.Command{
+ Use: "create -f [file|dir|-]",
+ Version: version.Build(),
+ Short: "Create topn from files",
+ RunE: func(cmd *cobra.Command, _ []string) error {
+ return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ s := new(databasev1.TopNAggregation)
+ err :=
protojson.Unmarshal(request.data, s)
+ if err != nil {
+ return nil, err
+ }
+ cr :=
&databasev1.TopNAggregationRegistryServiceCreateRequest{
+ TopNAggregation: s,
+ }
+ b, err := protojson.Marshal(cr)
+ if err != nil {
+ return nil, err
+ }
+ return
request.req.SetBody(b).Post(getPath(topnSchemaPath))
+ },
+ func(_ int, reqBody reqBody, _ []byte) error {
+ fmt.Printf("topn %s.%s is created",
reqBody.group, reqBody.name)
+ fmt.Println()
+ return nil
+ }, enableTLS, insecure, cert)
+ },
+ }
+
+ // e.g.
http://127.0.0.1:17913/api/v1/topn-agg/schema/{sw_metric}/{my_measure_topn}
+ updateCmd := &cobra.Command{
+ Use: "update -f [file|dir|-]",
+ Version: version.Build(),
+ Short: "Update topn from files",
+ RunE: func(cmd *cobra.Command, _ []string) (err error) {
+ return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ s := new(databasev1.TopNAggregation)
+ err :=
protojson.Unmarshal(request.data, s)
+ if err != nil {
+ return nil, err
+ }
+ cr :=
&databasev1.TopNAggregationRegistryServiceUpdateRequest{
+ TopNAggregation: s,
+ }
+ b, err := protojson.Marshal(cr)
+ if err != nil {
+ return nil, err
+ }
+ return request.req.SetBody(b).
+ SetPathParam("name",
request.name).SetPathParam("group", request.group).
+
Put(getPath(topnSchemaPathWithParams))
+ },
+ func(_ int, reqBody reqBody, _ []byte) error {
+ fmt.Printf("topn %s.%s is updated",
reqBody.group, reqBody.name)
+ fmt.Println()
+ return nil
+ }, enableTLS, insecure, cert)
+ },
+ }
+
+ // e.g.
http://127.0.0.1:17913/api/v1/topn-agg/schema/{sw_metric}/{my_measure_topn}
+ getCmd := &cobra.Command{
+ Use: "get [-g group] -n name",
+ Version: version.Build(),
+ Short: "Get a topn",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Get(getPath(topnSchemaPathWithParams))
+ }, yamlPrinter, enableTLS, insecure, cert)
+ },
+ }
+
+ deleteCmd := &cobra.Command{
+ Use: "delete [-g group] -n name",
+ Version: version.Build(),
+ Short: "Delete a topn",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Delete(getPath(topnSchemaPathWithParams))
+ }, func(_ int, reqBody reqBody, _ []byte) error {
+ fmt.Printf("topn %s.%s is deleted",
reqBody.group, reqBody.name)
+ fmt.Println()
+ return nil
+ }, enableTLS, insecure, cert)
+ },
+ }
+ bindNameFlag(getCmd, deleteCmd)
+
+ // e.g. http://127.0.0.1:17913/api/v1/topn-agg/schema/lists/{sw_metric}
+ listCmd := &cobra.Command{
+ Use: "list [-g group]",
+ Version: version.Build(),
+ Short: "List topn",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/topn-agg/schema/lists/{group}"))
+ }, yamlPrinter, enableTLS, insecure, cert)
+ },
+ }
+
+ // e.g. http://127.0.0.1:17913/api/v1/measure/topn
+ queryCmd := &cobra.Command{
+ Use: "query [-s start_time] [-e end_time] -f [file|dir|-]",
+ Version: version.Build(),
+ Short: "Query data in a topn",
+ Long: timeRangeUsage,
+ RunE: func(cmd *cobra.Command, _ []string) (err error) {
+ return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ return
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/topn"))
+ }, yamlPrinter, enableTLS, insecure, cert)
+ },
+ }
+ bindFileFlag(createCmd, updateCmd, queryCmd)
+ bindTimeRangeFlag(queryCmd)
+
+ bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
+ topnCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
+ return topnCmd
+}
diff --git a/bydbctl/internal/cmd/topn_test.go
b/bydbctl/internal/cmd/topn_test.go
new file mode 100644
index 00000000..73ffd6aa
--- /dev/null
+++ b/bydbctl/internal/cmd/topn_test.go
@@ -0,0 +1,360 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/spf13/cobra"
+ "github.com/zenizh/go-capturer"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ cases_measure_data
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+)
+
+var _ = Describe("Topn Schema Operation", func() {
+ var addr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ BeforeEach(func() {
+ _, addr, deferFunc = setup.EmptyStandalone()
+ addr = httpSchema + addr
+ // extracting the operation of creating topn schema
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f",
"-"})
+ createGroup := func() string {
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: group1
+catalog: CATALOG_MEASURE
+resource_opts:
+ shard_num: 2
+ segment_interval:
+ unit: UNIT_DAY
+ num: 1
+ ttl:
+ unit: UNIT_DAY
+ num: 7`))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution
fails:%v", err)
+ }
+ })
+ }
+ Eventually(createGroup,
flags.EventuallyTimeout).Should(ContainSubstring("group group1 is created"))
+ rootCmd.SetArgs([]string{"measure", "create", "-a", addr, "-f",
"-"})
+ createMeasure := func() string {
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name1
+ group: group1
+tag_families:
+ - name: default
+ tags:
+ - name: id
+ type: TAG_TYPE_STRING
+ - name: entity_id
+ type: TAG_TYPE_STRING
+fields:
+ - name: total
+ field_type: FIELD_TYPE_INT
+ encoding_method: ENCODING_METHOD_GORILLA
+ compression_method: COMPRESSION_METHOD_ZSTD
+ - name: value
+ field_type: FIELD_TYPE_INT
+ encoding_method: ENCODING_METHOD_GORILLA
+ compression_method: COMPRESSION_METHOD_ZSTD
+entity:
+ tag_names:
+ - id`))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution
fails:%v", err)
+ }
+ })
+ }
+ Eventually(createMeasure,
flags.EventuallyTimeout).Should(ContainSubstring("measure group1.name1 is
created"))
+ rootCmd.SetArgs([]string{"topn", "create", "-a", addr, "-f",
"-"})
+ createTopn := func() string {
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name2
+ group: group1
+source_measure:
+ name: name1
+ group: group1
+field_name: value
+field_value_sort: SORT_UNSPECIFIED
+group_by_tag_names:
+ - id
+counters_number: 10000
+lru_size: 10`))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution
fails:%v", err)
+ }
+ })
+ }
+ Eventually(createTopn,
flags.EventuallyTimeout).Should(ContainSubstring("topn group1.name2 is
created"))
+ })
+
+ It("get topn schema", func() {
+ rootCmd.SetArgs([]string{"topn", "get", "-g", "group1", "-n",
"name2"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp :=
new(databasev1.TopNAggregationRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.TopNAggregation.Metadata.Group).To(Equal("group1"))
+ Expect(resp.TopNAggregation.Metadata.Name).To(Equal("name2"))
+ })
+
+ It("update topn schema", func() {
+ rootCmd.SetArgs([]string{"topn", "update", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name2
+ group: group1
+source_measure:
+ name: name1
+ group: group1
+field_name: value
+field_value_sort: SORT_UNSPECIFIED
+group_by_tag_names:
+ - id
+ - entity_id
+counters_number: 10000
+lru_size: 10`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("topn group1.name2 is updated"))
+ rootCmd.SetArgs([]string{"topn", "get", "-g", "group1", "-n",
"name2"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp :=
new(databasev1.TopNAggregationRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.TopNAggregation.Metadata.Group).To(Equal("group1"))
+ Expect(resp.TopNAggregation.Metadata.Name).To(Equal("name2"))
+
Expect(resp.TopNAggregation.GroupByTagNames[1]).To(Equal("entity_id"))
+ })
+
+ It("delete topn schema", func() {
+ // delete
+ rootCmd.SetArgs([]string{"topn", "delete", "-g", "group1",
"-n", "name2"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("topn group1.name2 is deleted"))
+ // get again
+ rootCmd.SetArgs([]string{"topn", "get", "-g", "group1", "-n",
"name2"})
+ err := rootCmd.Execute()
+ Expect(err).To(MatchError("rpc error: code = NotFound desc =
banyandb: resource not found"))
+ })
+
+ It("list topn schema", func() {
+ // create another topn schema for list operation
+ rootCmd.SetArgs([]string{"topn", "create", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name3
+ group: group1
+source_measure:
+ name: name1
+ group: group1
+field_name: value
+field_value_sort: SORT_DESC
+group_by_tag_names:
+ - id
+counters_number: 10000
+lru_size: 10`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("topn group1.name3 is created"))
+ // list
+ rootCmd.SetArgs([]string{"topn", "list", "-g", "group1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp :=
new(databasev1.TopNAggregationRegistryServiceListResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.TopNAggregation).To(HaveLen(2))
+ })
+
+ AfterEach(func() {
+ deferFunc()
+ })
+})
+
+var _ = Describe("Topn Data Query", func() {
+ var addr, grpcAddr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ var now time.Time
+ var startStr, endStr string
+ var interval time.Duration
+ BeforeEach(func() {
+ var err error
+ now, err = time.ParseInLocation("2006-01-02T15:04:05",
"2021-09-01T23:30:00", time.Local)
+ Expect(err).NotTo(HaveOccurred())
+ startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
+ interval = 1 * time.Millisecond
+ endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
+ grpcAddr, addr, deferFunc = setup.Standalone()
+ addr = httpSchema + addr
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ })
+
+ It("query all topn data", func() {
+ conn, err := grpclib.NewClient(
+ grpcAddr,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ cases_measure_data.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+ rootCmd.SetArgs([]string{"measure", "query", "-a", addr, "-f",
"-"})
+ issue := func() string {
+ rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+name: service_instance_cpm_minute
+groups: ["sw_metric"]
+timeRange:
+ begin: %s
+ end: %s
+tagProjection:
+ tagFamilies:
+ - name: default
+ tags:
+ - id
+ - entity_id
+ - service_id`, startStr, endStr)))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ }
+ Eventually(issue,
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+ Eventually(func() int {
+ out := issue()
+ GinkgoWriter.Println(out)
+ resp := new(measurev1.QueryResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ GinkgoWriter.Println(resp)
+ return len(resp.DataPoints)
+ }, flags.EventuallyTimeout).Should(Equal(6))
+
+ rootCmd.SetArgs([]string{"topn", "query", "-a", addr, "-f",
"-"})
+ issue1 := func() string {
+ rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+name: service_instance_cpm_minute_top_bottom_100
+groups: ["sw_metric"]
+timeRange:
+ begin: %s
+ end: %s
+topN: 3
+agg: 2
+fieldValueSort: 1`, startStr, endStr)))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ }
+ Eventually(issue1,
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+ Eventually(func() int {
+ out := issue1()
+ GinkgoWriter.Println(out)
+ resp := new(measurev1.TopNResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ GinkgoWriter.Println(resp)
+ return len(resp.Lists[0].Items)
+ }, flags.EventuallyTimeout).Should(Equal(3))
+ })
+
+ DescribeTable("query topn data with time range flags", func(timeArgs
...string) {
+ conn, err := grpclib.NewClient(
+ grpcAddr,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ now := timestamp.NowMilli()
+ interval := time.Minute
+ cases_measure_data.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+
+ args := []string{"topn", "query", "-a", addr}
+ args = append(args, timeArgs...)
+ args = append(args, "-f", "-")
+ rootCmd.SetArgs(args)
+ issue1 := func() string {
+ rootCmd.SetIn(strings.NewReader(`
+name: service_instance_cpm_minute_top_bottom_100
+groups: ["sw_metric"]
+topN: 3
+agg: 2
+fieldValueSort: 1`))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ }
+ Eventually(issue1,
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+ Eventually(func() int {
+ out := issue1()
+ GinkgoWriter.Println(out)
+ resp := new(measurev1.TopNResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ GinkgoWriter.Println(resp)
+ return len(resp.Lists[0].Items)
+ }, flags.EventuallyTimeout).Should(Equal(3))
+ },
+ Entry("relative start", "--start", "-30m"),
+ Entry("relative end", "--end", "0m"),
+ Entry("absolute start", "--start", startStr),
+ Entry("absolute end", "--end", endStr),
+ Entry("default"),
+ Entry("all relative", "--start", "-30m", "--end", "0m"),
+ Entry("all absolute", "--start", startStr, "--end", endStr),
+ )
+
+ AfterEach(func() {
+ deferFunc()
+ })
+})
diff --git a/docs/concept/data-model.md b/docs/concept/data-model.md
index 7dc8c615..00a6a522 100644
--- a/docs/concept/data-model.md
+++ b/docs/concept/data-model.md
@@ -164,7 +164,7 @@ field_name: value
field_value_sort: SORT_UNSPECIFIED
group_by_tag_names:
- entity_id
-counters_number: 10000
+counters_number: 1000
lru_size: 10
```
diff --git a/docs/interacting/bydbctl/query/top-n-aggregation.md
b/docs/interacting/bydbctl/query/top-n-aggregation.md
new file mode 100644
index 00000000..7c20a87c
--- /dev/null
+++ b/docs/interacting/bydbctl/query/top-n-aggregation.md
@@ -0,0 +1,124 @@
+# Query [TopNAggregation](../../../concept/data-model.md#topnaggregation)
+
+Query operation queries the data in a top-n-aggregation.
+
+[bydbctl](../bydbctl.md) is the command line tool in examples.
+
+The input contains two parts:
+
+- Request: a YAML-based text which is defined by the [API](#api-reference)
+- Time Range: YAML and CLI's flags both support it.
+
+## Time Range
+
+The query specification contains `time_range` field. The request should set
absolute times to it. `bydbctl` also provides `start` and `end` flags to
support passing absolute and relative times.
+
+"start" and "end" specify a time range during which the query is performed,
they can be an absolute time like
["2006-01-02T15:04:05Z07:00"](https://www.rfc-editor.org/rfc/rfc3339),
+or relative time (to the current time) like "-30m", or "30m".
+They are both optional and their default values follow the rules below:
+
+- when "start" and "end" are both absent, "start = now - 30 minutes" and "end
= now",
+ namely past 30 minutes;
+- when "start" is absent and "end" is present, this command calculates "start"
(minus 30 units),
+ e.g. "end = 2022-11-09T12:34:00Z", so "start = end - 30 minutes =
2022-11-09T12:04:00Z";
+- when "start" is present and "end" is absent, this command calculates "end"
(plus 30 units),
+ e.g. "start = 2022-11-09T12:04:00Z", so "end = start + 30 minutes =
2022-11-09T12:34:00Z".
+
+## Understand the schema you are querying
+
+Before querying the data, you need to know the top-n-aggregation name, source
measure and other basic information in the top-n-aggregation. You can use the
`bydbctl topn get` command to get the top-n-aggregation schema.
+If you want to get the schema of a top-n-aggregation named
`service_instance_cpm_minute_top_bottom_100` in the group `sw_metric`, you can
use the below command:
+
+```shell
+bydbctl topn get -g sw_metric -n service_instance_cpm_minute_top_bottom_100
+```
+
+```shell
+topNAggregation:
+ countersNumber: 1000
+ criteria: null
+ fieldName: value
+ fieldValueSort: SORT_UNSPECIFIED
+ groupByTagNames:
+ - service_id
+ lruSize: 10
+ metadata:
+ group: sw_metric
+ name: service_instance_cpm_minute_top_bottom_100
+ sourceMeasure:
+ group: sw_metric
+ name: service_instance_cpm_minute
+ updatedAt: null
+```
+
+## Examples
+
+The following examples use above schema to show how to query data in a
top-n-aggregation and cover some common use cases:
+
+### Query between specific time range
+
+To retrieve a series of items between `2021-09-01T23:10:00Z` and
`2021-09-01T23:35:00Z` could use the below command. These items also choose a
tag `service_id` which lives in a family named `default`. The fieldValueSort is
an enumeration type, with `0` for SORT_UNSPECIFIED, `1` for SORT_DESC, and `2`
for SORT_ASC.
+
+```shell
+bydbctl topn query -f - <<EOF
+name: "service_instance_cpm_minute_top_bottom_100"
+groups: ["sw_metric"]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["service_id"]
+timeRange:
+ begin: 2021-09-01T23:10:00+08:00
+ end: 2021-09-01T23:35:00+08:00
+topN: 3
+agg: "AGGREGATION_FUNCTION_MAX"
+fieldValueSort: "SORT_DESC"
+EOF
+```
+
+### Query using relative time duration
+
+The below command could query data in the last 30 minutes using relative time
duration :
+
+```shell
+bydbctl topn query --start -30m -f - <<EOF
+name: "service_instance_cpm_minute_top_bottom_100"
+groups: ["sw_metric"]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["service_id"]
+topN: 3
+agg: "AGGREGATION_FUNCTION_MAX"
+fieldValueSort: "SORT_DESC"
+EOF
+```
+
+### Query with filter
+
+The below command could query data with a filter where the service_id is
`svc_1`:
+
+```shell
+bydbctl topn query -f - <<EOF
+name: "service_instance_cpm_minute_top_bottom_100"
+groups: ["sw_metric"]
+topN: 3
+agg: "AGGREGATION_FUNCTION_MAX"
+fieldValueSort: "SORT_DESC"
+conditions:
+ - name: "service_id"
+ op: "BINARY_OP_EQ"
+ value:
+ str:
+ value: "svc_1"
+EOF
+```
+Note: Only tags defined in `groupByTagNames` and used with the `EQ` operation
are supported for filtering.
+
+More filter operations can be found in [here](filter-operation.md).
+
+### More examples can be found in
[here](https://github.com/apache/skywalking-banyandb/tree/main/test/cases/topn/data/input).
+
+## API Reference
+
+[MeasureService v1](../../../api-reference.md#measureservice)
diff --git a/docs/interacting/bydbctl/schema/top-n-aggregation.md
b/docs/interacting/bydbctl/schema/top-n-aggregation.md
new file mode 100644
index 00000000..192e4a56
--- /dev/null
+++ b/docs/interacting/bydbctl/schema/top-n-aggregation.md
@@ -0,0 +1,163 @@
+# CRUD [TopNAggregation](../../../concept/data-model.md#topnaggregation)
+
+CRUD operations create, read, update and delete top-n-aggregations.
+
+[bydbctl](../bydbctl.md) is the command line tool in examples.
+
+Find the Top-N entities from a dataset in a time range is a common scenario.
Top-n-aggregation aims to pre-calculate the top/bottom entities during the
measure writing phase.
+
+## Create operation
+
+Create operation adds a new top-n-aggregation to the database's metadata
registry repository. If the top-n-aggregation does not currently exist, create
operation will create the schema.
+
+### Examples of creating
+
+A top-n-aggregation belongs to a unique group. We should create such a group
with a catalog `CATALOG_MEASURE` before creating a top-n-aggregation.
+
+```shell
+bydbctl group create -f - <<EOF
+metadata:
+ name: sw_metric
+catalog: CATALOG_MEASURE
+resource_opts:
+ shard_num: 2
+ segment_interval:
+ unit: UNIT_DAY
+ num: 1
+ ttl:
+ unit: UNIT_DAY
+ num: 7
+EOF
+```
+
+The group creates two shards to store data points. Every day, it would create a
+segment that will generate a block every 2 hours.
+
+The data in this group will keep 7 days.
+
+The top-n-aggregation monitors the data ingestion of the source measure while
generating ranked top-N entities. We should create such a measure before
creating a top-n-aggregation.
+
+```shell
+bydbctl measure create -f - <<EOF
+metadata:
+ name: service_instance_cpm_minute
+ group: sw_metric
+tag_families:
+ - name: default
+ tags:
+ - name: id
+ type: TAG_TYPE_STRING
+ - name: entity_id
+ type: TAG_TYPE_STRING
+ - name: service_id
+ type: TAG_TYPE_STRING
+fields:
+ - name: total
+ field_type: FIELD_TYPE_INT
+ encoding_method: ENCODING_METHOD_GORILLA
+ compression_method: COMPRESSION_METHOD_ZSTD
+ - name: value
+ field_type: FIELD_TYPE_INT
+ encoding_method: ENCODING_METHOD_GORILLA
+ compression_method: COMPRESSION_METHOD_ZSTD
+entity:
+ tagNames:
+ - service_id
+ - entity_id
+sharding_key:
+ tagNames:
+ - service_id
+interval: 1m
+EOF
+```
+
+`service_instance_cpm_minute` expects to ingest per-minute metrics for
individual service instances.
+
+Then, the below command will create a new top-n-aggregation:
+
+```shell
+bydbctl topn create -f - <<EOF
+metadata:
+ name: service_instance_cpm_minute_top_bottom_100
+ group: sw_metric
+source_measure:
+ name: service_instance_cpm_minute
+ group: sw_metric
+field_name: value
+field_value_sort: SORT_UNSPECIFIED
+group_by_tag_names:
+ - service_id
+counters_number: 1000
+lru_size: 10
+EOF
+```
+`service_instance_cpm_minute_top_bottom_100` is watching the data ingesting of
the source measure `service_instance_cpm_minute` to generate both top 1000 and
bottom 1000 entity cardinalities. If only Top 1000 or Bottom 1000 is needed,
the `field_value_sort` could be `DESC` or `ASC` respectively.
+
+- `SORT_DESC`: Top-N. In a series of `1,2,3...1000`. Top10’s result is
`1000,999...991`.
+- `SORT_ASC`: Bottom-N. In a series of `1,2,3...1000`. Bottom10’s result is
`1,2...10`.
+
+Tags in `group_by_tag_names` are used as dimensions. These tags can be
searched (only equality is supported) in the query phase. Tags do not exist in
`group_by_tag_names` will be dropped in the pre-calculating phase.
+
+`counters_number` denotes the number of entity cardinality. As the above
example shows, calculating the Top 100 among 10 thousands is easier than among
10 millions.
+
+`lru_size` is a late data optimizing flag. The higher the number, the more
late data, but the more memory space is consumed.
+
+More top-n-aggregation information can be found in
[here](../../../concept/data-model.md#topnaggregation).
+
+## Get operation
+
+Get(Read) operation gets a top-n-aggregation's schema.
+
+### Examples of getting
+
+```shell
+bydbctl topn get -g sw_metric -n service_instance_cpm_minute_top_bottom_100
+```
+
+## Update operation
+
+Update operation changes a top-n-aggregation's schema.
+
+### Examples of updating
+
+```shell
+bydbctl topn update -f - <<EOF
+metadata:
+ name: service_instance_cpm_minute_top_bottom_100
+ group: sw_metric
+source_measure:
+ name: service_instance_cpm_minute
+ group: sw_metric
+field_name: value
+field_value_sort: SORT_UNSPECIFIED
+group_by_tag_names:
+ - service_id
+ - entity_id
+counters_number: 1000
+lru_size: 10
+EOF
+```
+
+## Delete operation
+
+Delete operation removes a top-n-aggregation's schema.
+
+### Examples of deleting
+
+```shell
+bydbctl topn delete -g sw_metric -n service_instance_cpm_minute_top_bottom_100
+```
+
+## List operation
+
+The list operation shows all top-n-aggregation' schema in a group.
+
+### Examples of listing
+
+```shell
+bydbctl topn list -g sw_metric
+```
+
+## API Reference
+
+[TopNAggregation Registration
Operations](../../../api-reference.md#topnaggregationregistryservice)
diff --git a/docs/menu.yml b/docs/menu.yml
index 779ad336..4e2d28ec 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -57,6 +57,8 @@ catalog:
path: "/interacting/bydbctl/schema/index-rule"
- name: "IndexRuleBinding"
path: "/interacting/bydbctl/schema/index-rule-binding"
+ - name: "Top N Aggregation"
+ path: "/interacting/bydbctl/schema/top-n-aggregation"
- name: "Querying Data"
catalog:
- name: "Measure"
@@ -65,6 +67,8 @@ catalog:
path: "/interacting/bydbctl/query/stream"
- name: "Filter Operation"
path: "/interacting/bydbctl/query/filter-operation"
+ - name: "Top N Aggregation"
+ path: "/interacting/bydbctl/query/top-n-aggregation"
- name: "CRUD Property"
path: "/interacting/bydbctl/property"
- name: "Analyzing Data"