This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 544a601d Feature 12893/add topn subcommand (#656)
544a601d is described below

commit 544a601d0c71c0aa5fa259d800854330b1abb800
Author: OmCheeLin <[email protected]>
AuthorDate: Tue Apr 29 15:37:10 2025 +0800

    Feature 12893/add topn subcommand (#656)
    
    * [Feature] Add topn subcommand to bydbctl for querying TopN aggregation 
results #12893
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
    Co-authored-by: Gao Hongtao <[email protected]>
---
 banyand/measure/topn.go                            |   5 +-
 bydbctl/internal/cmd/root.go                       |   2 +-
 bydbctl/internal/cmd/topn.go                       | 163 ++++++++++
 bydbctl/internal/cmd/topn_test.go                  | 360 +++++++++++++++++++++
 docs/concept/data-model.md                         |   2 +-
 .../interacting/bydbctl/query/top-n-aggregation.md | 124 +++++++
 .../bydbctl/schema/top-n-aggregation.md            | 163 ++++++++++
 docs/menu.yml                                      |   4 +
 8 files changed, 820 insertions(+), 3 deletions(-)

diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 48977980..a53624cf 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -454,12 +454,15 @@ func (manager *topNProcessorManager) start(topNSchema 
*databasev1.TopNAggregatio
 
 func (manager *topNProcessorManager) removeProcessors(topNSchema 
*databasev1.TopNAggregation) []*topNStreamingProcessor {
        var processors []*topNStreamingProcessor
+       var newList []*topNStreamingProcessor
        for i := range manager.processorList {
                if manager.processorList[i].topNSchema.GetMetadata().GetName() 
== topNSchema.GetMetadata().GetName() {
                        processors = append(processors, 
manager.processorList[i])
-                       manager.processorList = 
append(manager.processorList[:i], manager.processorList[i+1:]...)
+               } else {
+                       newList = append(newList, manager.processorList[i])
                }
        }
+       manager.processorList = newList
        return processors
 }
 
diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go
index 94823ddd..d167c221 100644
--- a/bydbctl/internal/cmd/root.go
+++ b/bydbctl/internal/cmd/root.go
@@ -69,7 +69,7 @@ func RootCmdFlags(command *cobra.Command) {
        _ = viper.BindPFlag("addr", command.PersistentFlags().Lookup("addr"))
        viper.SetDefault("addr", "http://localhost:17913";)
 
-       command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(), 
newMeasureCmd(),
+       command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(), 
newMeasureCmd(), newTopnCmd(),
                newIndexRuleCmd(), newIndexRuleBindingCmd(), newPropertyCmd(), 
newHealthCheckCmd(), newAnalyzeCmd())
 }
 
diff --git a/bydbctl/internal/cmd/topn.go b/bydbctl/internal/cmd/topn.go
new file mode 100644
index 00000000..d7fa90a9
--- /dev/null
+++ b/bydbctl/internal/cmd/topn.go
@@ -0,0 +1,163 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+       "fmt"
+
+       "github.com/go-resty/resty/v2"
+       "github.com/spf13/cobra"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/version"
+)
+
+const topnSchemaPath = "/api/v1/topn-agg/schema"
+
+var topnSchemaPathWithParams = topnSchemaPath + pathTemp
+
+func newTopnCmd() *cobra.Command {
+       topnCmd := &cobra.Command{
+               Use:     "topn",
+               Version: version.Build(),
+               Short:   "Topn operation",
+       }
+
+       // e.g. http://127.0.0.1:17913/api/v1/topn-agg/schema
+       createCmd := &cobra.Command{
+               Use:     "create -f [file|dir|-]",
+               Version: version.Build(),
+               Short:   "Create topn from files",
+               RunE: func(cmd *cobra.Command, _ []string) error {
+                       return rest(func() ([]reqBody, error) { return 
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       s := new(databasev1.TopNAggregation)
+                                       err := 
protojson.Unmarshal(request.data, s)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       cr := 
&databasev1.TopNAggregationRegistryServiceCreateRequest{
+                                               TopNAggregation: s,
+                                       }
+                                       b, err := protojson.Marshal(cr)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       return 
request.req.SetBody(b).Post(getPath(topnSchemaPath))
+                               },
+                               func(_ int, reqBody reqBody, _ []byte) error {
+                                       fmt.Printf("topn %s.%s is created", 
reqBody.group, reqBody.name)
+                                       fmt.Println()
+                                       return nil
+                               }, enableTLS, insecure, cert)
+               },
+       }
+
+       // e.g. 
http://127.0.0.1:17913/api/v1/topn-agg/schema/{sw_metric}/{my_measure_topn}
+       updateCmd := &cobra.Command{
+               Use:     "update -f [file|dir|-]",
+               Version: version.Build(),
+               Short:   "Update topn from files",
+               RunE: func(cmd *cobra.Command, _ []string) (err error) {
+                       return rest(func() ([]reqBody, error) { return 
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       s := new(databasev1.TopNAggregation)
+                                       err := 
protojson.Unmarshal(request.data, s)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       cr := 
&databasev1.TopNAggregationRegistryServiceUpdateRequest{
+                                               TopNAggregation: s,
+                                       }
+                                       b, err := protojson.Marshal(cr)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       return request.req.SetBody(b).
+                                               SetPathParam("name", 
request.name).SetPathParam("group", request.group).
+                                               
Put(getPath(topnSchemaPathWithParams))
+                               },
+                               func(_ int, reqBody reqBody, _ []byte) error {
+                                       fmt.Printf("topn %s.%s is updated", 
reqBody.group, reqBody.name)
+                                       fmt.Println()
+                                       return nil
+                               }, enableTLS, insecure, cert)
+               },
+       }
+
+       // e.g. 
http://127.0.0.1:17913/api/v1/topn-agg/schema/{sw_metric}/{my_measure_topn}
+       getCmd := &cobra.Command{
+               Use:     "get [-g group] -n name",
+               Version: version.Build(),
+               Short:   "Get a topn",
+               RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
+                               return request.req.SetPathParam("name", 
request.name).SetPathParam("group", 
request.group).Get(getPath(topnSchemaPathWithParams))
+                       }, yamlPrinter, enableTLS, insecure, cert)
+               },
+       }
+
+       deleteCmd := &cobra.Command{
+               Use:     "delete [-g group] -n name",
+               Version: version.Build(),
+               Short:   "Delete a topn",
+               RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
+                               return request.req.SetPathParam("name", 
request.name).SetPathParam("group", 
request.group).Delete(getPath(topnSchemaPathWithParams))
+                       }, func(_ int, reqBody reqBody, _ []byte) error {
+                               fmt.Printf("topn %s.%s is deleted", 
reqBody.group, reqBody.name)
+                               fmt.Println()
+                               return nil
+                       }, enableTLS, insecure, cert)
+               },
+       }
+       bindNameFlag(getCmd, deleteCmd)
+
+       // e.g. http://127.0.0.1:17913/api/v1/topn-agg/schema/lists/{sw_metric}
+       listCmd := &cobra.Command{
+               Use:     "list [-g group]",
+               Version: version.Build(),
+               Short:   "List topn",
+               RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
+                               return request.req.SetPathParam("group", 
request.group).Get(getPath("/api/v1/topn-agg/schema/lists/{group}"))
+                       }, yamlPrinter, enableTLS, insecure, cert)
+               },
+       }
+
+       // e.g. http://127.0.0.1:17913/api/v1/measure/topn
+       queryCmd := &cobra.Command{
+               Use:     "query [-s start_time] [-e end_time] -f [file|dir|-]",
+               Version: version.Build(),
+               Short:   "Query data in a topn",
+               Long:    timeRangeUsage,
+               RunE: func(cmd *cobra.Command, _ []string) (err error) {
+                       return rest(func() ([]reqBody, error) { return 
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       return 
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/topn"))
+                               }, yamlPrinter, enableTLS, insecure, cert)
+               },
+       }
+       bindFileFlag(createCmd, updateCmd, queryCmd)
+       bindTimeRangeFlag(queryCmd)
+
+       bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
+       topnCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
+       return topnCmd
+}
diff --git a/bydbctl/internal/cmd/topn_test.go 
b/bydbctl/internal/cmd/topn_test.go
new file mode 100644
index 00000000..73ffd6aa
--- /dev/null
+++ b/bydbctl/internal/cmd/topn_test.go
@@ -0,0 +1,360 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+       "fmt"
+       "strings"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/spf13/cobra"
+       "github.com/zenizh/go-capturer"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       cases_measure_data 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+)
+
+var _ = Describe("Topn Schema Operation", func() {
+       var addr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       BeforeEach(func() {
+               _, addr, deferFunc = setup.EmptyStandalone()
+               addr = httpSchema + addr
+               // extracting the operation of creating topn schema
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+               rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", 
"-"})
+               createGroup := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: group1
+catalog: CATALOG_MEASURE
+resource_opts:
+  shard_num: 2
+  segment_interval:
+    unit: UNIT_DAY
+    num: 1
+  ttl:
+    unit: UNIT_DAY
+    num: 7`))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("execution 
fails:%v", err)
+                               }
+                       })
+               }
+               Eventually(createGroup, 
flags.EventuallyTimeout).Should(ContainSubstring("group group1 is created"))
+               rootCmd.SetArgs([]string{"measure", "create", "-a", addr, "-f", 
"-"})
+               createMeasure := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name1
+  group: group1
+tag_families:
+  - name: default
+    tags:
+    - name: id
+      type: TAG_TYPE_STRING
+    - name: entity_id
+      type: TAG_TYPE_STRING
+fields:
+  - name: total
+    field_type: FIELD_TYPE_INT
+    encoding_method: ENCODING_METHOD_GORILLA
+    compression_method: COMPRESSION_METHOD_ZSTD
+  - name: value
+    field_type: FIELD_TYPE_INT
+    encoding_method: ENCODING_METHOD_GORILLA
+    compression_method: COMPRESSION_METHOD_ZSTD
+entity:
+  tag_names:
+  - id`))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("execution 
fails:%v", err)
+                               }
+                       })
+               }
+               Eventually(createMeasure, 
flags.EventuallyTimeout).Should(ContainSubstring("measure group1.name1 is 
created"))
+               rootCmd.SetArgs([]string{"topn", "create", "-a", addr, "-f", 
"-"})
+               createTopn := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name2
+  group: group1
+source_measure:
+  name: name1
+  group: group1
+field_name: value
+field_value_sort: SORT_UNSPECIFIED
+group_by_tag_names:
+  - id
+counters_number: 10000
+lru_size: 10`))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("execution 
fails:%v", err)
+                               }
+                       })
+               }
+               Eventually(createTopn, 
flags.EventuallyTimeout).Should(ContainSubstring("topn group1.name2 is 
created"))
+       })
+
+       It("get topn schema", func() {
+               rootCmd.SetArgs([]string{"topn", "get", "-g", "group1", "-n", 
"name2"})
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := 
new(databasev1.TopNAggregationRegistryServiceGetResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.TopNAggregation.Metadata.Group).To(Equal("group1"))
+               Expect(resp.TopNAggregation.Metadata.Name).To(Equal("name2"))
+       })
+
+       It("update topn schema", func() {
+               rootCmd.SetArgs([]string{"topn", "update", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name2
+  group: group1
+source_measure:
+  name: name1
+  group: group1
+field_name: value
+field_value_sort: SORT_UNSPECIFIED
+group_by_tag_names:
+  - id
+  - entity_id
+counters_number: 10000
+lru_size: 10`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("topn group1.name2 is updated"))
+               rootCmd.SetArgs([]string{"topn", "get", "-g", "group1", "-n", 
"name2"})
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := 
new(databasev1.TopNAggregationRegistryServiceGetResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.TopNAggregation.Metadata.Group).To(Equal("group1"))
+               Expect(resp.TopNAggregation.Metadata.Name).To(Equal("name2"))
+               
Expect(resp.TopNAggregation.GroupByTagNames[1]).To(Equal("entity_id"))
+       })
+
+       It("delete topn schema", func() {
+               // delete
+               rootCmd.SetArgs([]string{"topn", "delete", "-g", "group1", 
"-n", "name2"})
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("topn group1.name2 is deleted"))
+               // get again
+               rootCmd.SetArgs([]string{"topn", "get", "-g", "group1", "-n", 
"name2"})
+               err := rootCmd.Execute()
+               Expect(err).To(MatchError("rpc error: code = NotFound desc = 
banyandb: resource not found"))
+       })
+
+       It("list topn schema", func() {
+               // create another topn schema for list operation
+               rootCmd.SetArgs([]string{"topn", "create", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name3
+  group: group1
+source_measure:
+  name: name1
+  group: group1
+field_name: value
+field_value_sort: SORT_DESC
+group_by_tag_names:
+  - id
+counters_number: 10000
+lru_size: 10`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("topn group1.name3 is created"))
+               // list
+               rootCmd.SetArgs([]string{"topn", "list", "-g", "group1"})
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := 
new(databasev1.TopNAggregationRegistryServiceListResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.TopNAggregation).To(HaveLen(2))
+       })
+
+       AfterEach(func() {
+               deferFunc()
+       })
+})
+
+var _ = Describe("Topn Data Query", func() {
+       var addr, grpcAddr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       var now time.Time
+       var startStr, endStr string
+       var interval time.Duration
+       BeforeEach(func() {
+               var err error
+               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
+               Expect(err).NotTo(HaveOccurred())
+               startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
+               interval = 1 * time.Millisecond
+               endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
+               grpcAddr, addr, deferFunc = setup.Standalone()
+               addr = httpSchema + addr
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+       })
+
+       It("query all topn data", func() {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+               cases_measure_data.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+               rootCmd.SetArgs([]string{"measure", "query", "-a", addr, "-f", 
"-"})
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+name: service_instance_cpm_minute
+groups: ["sw_metric"]
+timeRange:
+  begin: %s
+  end: %s
+tagProjection:
+  tagFamilies:
+    - name: default
+      tags:
+        - id
+        - entity_id
+        - service_id`, startStr, endStr)))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               Expect(err).NotTo(HaveOccurred())
+                       })
+               }
+               Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       GinkgoWriter.Println(out)
+                       resp := new(measurev1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.DataPoints)
+               }, flags.EventuallyTimeout).Should(Equal(6))
+
+               rootCmd.SetArgs([]string{"topn", "query", "-a", addr, "-f", 
"-"})
+               issue1 := func() string {
+                       rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+name: service_instance_cpm_minute_top_bottom_100
+groups: ["sw_metric"]
+timeRange:
+  begin: %s
+  end: %s
+topN: 3
+agg: 2
+fieldValueSort: 1`, startStr, endStr)))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               Expect(err).NotTo(HaveOccurred())
+                       })
+               }
+               Eventually(issue1, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue1()
+                       GinkgoWriter.Println(out)
+                       resp := new(measurev1.TopNResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Lists[0].Items)
+               }, flags.EventuallyTimeout).Should(Equal(3))
+       })
+
+       DescribeTable("query topn data with time range flags", func(timeArgs 
...string) {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+               now := timestamp.NowMilli()
+               interval := time.Minute
+               cases_measure_data.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+
+               args := []string{"topn", "query", "-a", addr}
+               args = append(args, timeArgs...)
+               args = append(args, "-f", "-")
+               rootCmd.SetArgs(args)
+               issue1 := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+name: service_instance_cpm_minute_top_bottom_100
+groups: ["sw_metric"]
+topN: 3
+agg: 2
+fieldValueSort: 1`))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               Expect(err).NotTo(HaveOccurred())
+                       })
+               }
+               Eventually(issue1, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue1()
+                       GinkgoWriter.Println(out)
+                       resp := new(measurev1.TopNResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Lists[0].Items)
+               }, flags.EventuallyTimeout).Should(Equal(3))
+       },
+               Entry("relative start", "--start", "-30m"),
+               Entry("relative end", "--end", "0m"),
+               Entry("absolute start", "--start", startStr),
+               Entry("absolute end", "--end", endStr),
+               Entry("default"),
+               Entry("all relative", "--start", "-30m", "--end", "0m"),
+               Entry("all absolute", "--start", startStr, "--end", endStr),
+       )
+
+       AfterEach(func() {
+               deferFunc()
+       })
+})
diff --git a/docs/concept/data-model.md b/docs/concept/data-model.md
index 7dc8c615..00a6a522 100644
--- a/docs/concept/data-model.md
+++ b/docs/concept/data-model.md
@@ -164,7 +164,7 @@ field_name: value
 field_value_sort: SORT_UNSPECIFIED
 group_by_tag_names:
 - entity_id
-counters_number: 10000
+counters_number: 1000
 lru_size: 10
 ```
 
diff --git a/docs/interacting/bydbctl/query/top-n-aggregation.md 
b/docs/interacting/bydbctl/query/top-n-aggregation.md
new file mode 100644
index 00000000..7c20a87c
--- /dev/null
+++ b/docs/interacting/bydbctl/query/top-n-aggregation.md
@@ -0,0 +1,124 @@
+# Query [TopNAggregation](../../../concept/data-model.md#topnaggregation)
+
+Query operation queries the data in a top-n-aggregation.
+
+[bydbctl](../bydbctl.md) is the command line tool in examples.
+
+The input contains two parts:
+
+- Request: a YAML-based text which is defined by the [API](#api-reference)
+- Time Range: YAML and CLI's flags both support it.
+
+## Time Range
+
+The query specification contains `time_range` field. The request should set 
absolute times to it. `bydbctl` also provides `start` and `end` flags to 
support passing absolute and relative times.
+
+"start" and "end" specify a time range during which the query is performed, 
they can be an absolute time like 
["2006-01-02T15:04:05Z07:00"](https://www.rfc-editor.org/rfc/rfc3339),
+or relative time (to the current time) like "-30m", or "30m".
+They are both optional and their default values follow the rules below:
+
+- when "start" and "end" are both absent, "start = now - 30 minutes" and "end 
= now",
+  namely past 30 minutes;
+- when "start" is absent and "end" is present, this command calculates "start" 
(minus 30 units),
+  e.g. "end = 2022-11-09T12:34:00Z", so "start = end - 30 minutes = 
2022-11-09T12:04:00Z";
+- when "start" is present and "end" is absent, this command calculates "end" 
(plus 30 units),
+  e.g. "start = 2022-11-09T12:04:00Z", so "end = start + 30 minutes = 
2022-11-09T12:34:00Z".
+
+## Understand the schema you are querying
+
+Before querying the data, you need to know the top-n-aggregation name, source 
measure and other basic information in the top-n-aggregation. You can use the 
`bydbctl topn get` command to get the top-n-aggregation schema.
+If you want to get the schema of a top-n-aggregation named 
`service_instance_cpm_minute_top_bottom_100` in the group `sw_metric`, you can 
use the below command:
+
+```shell
+bydbctl topn get -g sw_metric -n service_instance_cpm_minute_top_bottom_100
+```
+
+```shell
+topNAggregation:
+  countersNumber: 1000
+  criteria: null
+  fieldName: value
+  fieldValueSort: SORT_UNSPECIFIED
+  groupByTagNames:
+  - service_id
+  lruSize: 10
+  metadata:
+    group: sw_metric
+    name: service_instance_cpm_minute_top_bottom_100
+  sourceMeasure:
+    group: sw_metric
+    name: service_instance_cpm_minute
+  updatedAt: null
+```
+
+## Examples
+
+The following examples use above schema to show how to query data in a 
top-n-aggregation and cover some common use cases:
+
+### Query between specific time range
+
+To retrieve a series of items between `2021-09-01T23:10:00Z` and 
`2021-09-01T23:35:00Z` could use the below command. These items also choose a 
tag `service_id` which lives in a family named `default`. The fieldValueSort is 
an enumeration type, with `0` for SORT_UNSPECIFIED, `1` for SORT_DESC, and `2` 
for SORT_ASC.
+
+```shell
+bydbctl topn query -f - <<EOF
+name: "service_instance_cpm_minute_top_bottom_100"
+groups: ["sw_metric"]
+tagProjection:
+  tagFamilies:
+    - name: "default"
+      tags: ["service_id"]
+timeRange:
+  begin: 2021-09-01T23:10:00+08:00
+  end: 2021-09-01T23:35:00+08:00
+topN: 3
+agg: "AGGREGATION_FUNCTION_MAX"
+fieldValueSort: "SORT_DESC"
+EOF
+```
+
+### Query using relative time duration
+
+The below command could query data in the last 30 minutes using relative time 
duration :
+
+```shell
+bydbctl topn query --start -30m -f - <<EOF
+name: "service_instance_cpm_minute_top_bottom_100"
+groups: ["sw_metric"]
+tagProjection:
+  tagFamilies:
+    - name: "default"
+      tags: ["service_id"]
+topN: 3
+agg: "AGGREGATION_FUNCTION_MAX"
+fieldValueSort: "SORT_DESC"
+EOF
+```
+
+### Query with filter
+
+The below command could query data with a filter where the service_id is 
`svc_1`:
+
+```shell
+bydbctl topn query -f - <<EOF
+name: "service_instance_cpm_minute_top_bottom_100"
+groups: ["sw_metric"]
+topN: 3
+agg: "AGGREGATION_FUNCTION_MAX"
+fieldValueSort: "SORT_DESC"
+conditions:
+  - name: "service_id"
+    op: "BINARY_OP_EQ"
+    value:
+      str:
+        value: "svc_1"
+EOF
+```
+Note: Only tags defined in `groupByTagNames` and used with the `EQ` operation 
are supported for filtering.
+
+More filter operations can be found in [here](filter-operation.md).
+
+### More examples can be found in 
[here](https://github.com/apache/skywalking-banyandb/tree/main/test/cases/topn/data/input).
+
+## API Reference
+
+[MeasureService v1](../../../api-reference.md#measureservice)
diff --git a/docs/interacting/bydbctl/schema/top-n-aggregation.md 
b/docs/interacting/bydbctl/schema/top-n-aggregation.md
new file mode 100644
index 00000000..192e4a56
--- /dev/null
+++ b/docs/interacting/bydbctl/schema/top-n-aggregation.md
@@ -0,0 +1,163 @@
+# CRUD [TopNAggregation](../../../concept/data-model.md#topnaggregation)
+
+CRUD operations create, read, update and delete top-n-aggregations.
+
+[bydbctl](../bydbctl.md) is the command line tool in examples.
+
+Find the Top-N entities from a dataset in a time range is a common scenario. 
Top-n-aggregation aims to pre-calculate the top/bottom entities during the 
measure writing phase.
+
+## Create operation
+
+Create operation adds a new top-n-aggregation to the database's metadata 
registry repository. If the top-n-aggregation does not currently exist, create 
operation will create the schema.
+
+### Examples of creating
+
+A top-n-aggregation belongs to a unique group. We should create such a group 
with a catalog `CATALOG_MEASURE` before creating a top-n-aggregation.
+
+```shell
+bydbctl group create -f - <<EOF
+metadata:
+  name: sw_metric
+catalog: CATALOG_MEASURE
+resource_opts:
+  shard_num: 2
+  segment_interval:
+    unit: UNIT_DAY
+    num: 1
+  ttl:
+    unit: UNIT_DAY
+    num: 7
+EOF
+```
+
+The group creates two shards to store data points. Every day, it would create a
+segment that will generate a block every 2 hours.
+
+The data in this group will keep 7 days.
+
+The top-n-aggregation monitors the data ingestion of the source measure while  
generating ranked top-N entities. We should create such a measure before 
creating a top-n-aggregation.
+
+```shell
+bydbctl measure create -f - <<EOF
+metadata:
+  name: service_instance_cpm_minute
+  group: sw_metric
+tag_families:
+  - name: default
+    tags:
+      - name: id
+        type: TAG_TYPE_STRING
+      - name: entity_id
+        type: TAG_TYPE_STRING
+      - name: service_id
+        type: TAG_TYPE_STRING
+fields:
+  - name: total
+    field_type: FIELD_TYPE_INT
+    encoding_method: ENCODING_METHOD_GORILLA
+    compression_method: COMPRESSION_METHOD_ZSTD
+  - name: value
+    field_type: FIELD_TYPE_INT
+    encoding_method: ENCODING_METHOD_GORILLA
+    compression_method: COMPRESSION_METHOD_ZSTD
+entity:
+  tagNames: 
+    - service_id
+    - entity_id
+sharding_key:
+  tagNames: 
+    - service_id
+interval: 1m
+EOF
+```
+
+`service_instance_cpm_minute` expects to ingest per-minute metrics for 
individual service instances. 
+
+Then, the below command will create a new top-n-aggregation:
+
+```shell
+bydbctl topn create -f - <<EOF
+metadata:
+  name: service_instance_cpm_minute_top_bottom_100
+  group: sw_metric
+source_measure:
+  name: service_instance_cpm_minute
+  group: sw_metric
+field_name: value
+field_value_sort: SORT_UNSPECIFIED
+group_by_tag_names:
+  - service_id
+counters_number: 1000
+lru_size: 10
+EOF
+```
+`service_instance_cpm_minute_top_bottom_100` is watching the data ingesting of 
the source measure `service_instance_cpm_minute` to generate both top 1000 and 
bottom 1000 entity cardinalities. If only Top 1000 or Bottom 1000 is needed, 
the `field_value_sort` could be `DESC` or `ASC` respectively.
+
+- `SORT_DESC`: Top-N. In a series of `1,2,3...1000`. Top10’s result is 
`1000,999...991`.
+- `SORT_ASC`: Bottom-N. In a series of `1,2,3...1000`. Bottom10’s result is 
`1,2...10`.
+
+Tags in `group_by_tag_names` are used as dimensions. These tags can be 
searched (only equality is supported) in the query phase. Tags do not exist in 
`group_by_tag_names` will be dropped in the pre-calculating phase.
+
+`counters_number` denotes the number of entity cardinality. As the above 
example shows, calculating the Top 100 among 10 thousands is easier than among 
10 millions.
+
+`lru_size` is a late data optimizing flag. The higher the number, the more 
late data, but the more memory space is consumed.
+
+More top-n-aggregation information can be found in 
[here](../../../concept/data-model.md#topnaggregation).
+
+## Get operation
+
+Get(Read) operation gets a top-n-aggregation's schema.
+
+### Examples of getting
+
+```shell
+bydbctl topn get -g sw_metric -n service_instance_cpm_minute_top_bottom_100
+```
+
+## Update operation
+
+Update operation changes a top-n-aggregation's schema.
+
+### Examples of updating
+
+```shell
+bydbctl topn update -f - <<EOF
+metadata:
+  name: service_instance_cpm_minute_top_bottom_100
+  group: sw_metric
+source_measure:
+  name: service_instance_cpm_minute
+  group: sw_metric
+field_name: value
+field_value_sort: SORT_UNSPECIFIED
+group_by_tag_names:
+  - service_id
+  - entity_id
+counters_number: 1000
+lru_size: 10
+EOF
+```
+
+## Delete operation
+
+Delete operation removes a top-n-aggregation's schema.
+
+### Examples of deleting
+
+```shell
+bydbctl topn delete -g sw_metric -n service_instance_cpm_minute_top_bottom_100
+```
+
+## List operation
+
+The list operation shows all top-n-aggregation' schema in a group.
+
+### Examples of listing
+
+```shell
+bydbctl topn list -g sw_metric
+```
+
+## API Reference
+
+[TopNAggregation Registration 
Operations](../../../api-reference.md#topnaggregationregistryservice)
diff --git a/docs/menu.yml b/docs/menu.yml
index 779ad336..4e2d28ec 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -57,6 +57,8 @@ catalog:
                 path: "/interacting/bydbctl/schema/index-rule"
               - name: "IndexRuleBinding"
                 path: "/interacting/bydbctl/schema/index-rule-binding"
+              - name: "Top N Aggregation"
+                path: "/interacting/bydbctl/schema/top-n-aggregation"
           - name: "Querying Data"
             catalog:
               - name: "Measure"
@@ -65,6 +67,8 @@ catalog:
                 path: "/interacting/bydbctl/query/stream"
               - name: "Filter Operation"
                 path: "/interacting/bydbctl/query/filter-operation"
+              - name: "Top N Aggregation"
+                path: "/interacting/bydbctl/query/top-n-aggregation"
           - name: "CRUD Property"
             path: "/interacting/bydbctl/property"
           - name: "Analyzing Data"

Reply via email to