This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch tracetest in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit dbe16d2a4040ca7b93dacb2845452b235819eb6b Author: kezhenxu94 <kezhenx...@apache.org> AuthorDate: Mon Sep 15 17:26:59 2025 +0800 feat: add a command to query trace by id --- bydbctl/internal/cmd/trace.go | 30 ++++++++- bydbctl/internal/cmd/trace_test.go | 126 +++++++++++++++++++++++++++++++++++++ test/cases/trace/data/data.go | 4 +- 3 files changed, 154 insertions(+), 6 deletions(-) diff --git a/bydbctl/internal/cmd/trace.go b/bydbctl/internal/cmd/trace.go index 3318a424..8337fab8 100644 --- a/bydbctl/internal/cmd/trace.go +++ b/bydbctl/internal/cmd/trace.go @@ -25,6 +25,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" "github.com/apache/skywalking-banyandb/pkg/version" ) @@ -137,8 +138,31 @@ func newTraceCmd() *cobra.Command { }, } - bindFileFlag(createCmd, updateCmd) - bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd) - traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd) + queryCmd := &cobra.Command{ + Use: "query -f [file|dir|-]", + Version: version.Build(), + Short: "Query data in a trace", + RunE: func(cmd *cobra.Command, _ []string) (err error) { + return rest(func() ([]reqBody, error) { return parseNameAndGroupFromYAML(cmd.InOrStdin()) }, + func(request request) (*resty.Response, error) { + queryReq := new(tracev1.QueryRequest) + err := protojson.Unmarshal(request.data, queryReq) + if err != nil { + return nil, err + } + + b, err := protojson.Marshal(queryReq) + if err != nil { + return nil, err + } + + return request.req.SetBody(b).Post(getPath("/api/v1/trace/data")) + }, yamlPrinter, enableTLS, insecure, cert) + }, + } + + bindFileFlag(createCmd, updateCmd, queryCmd) + bindTLSRelatedFlag(getCmd, createCmd, deleteCmd, updateCmd, listCmd, queryCmd) + traceCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd, queryCmd) return traceCmd } diff --git a/bydbctl/internal/cmd/trace_test.go b/bydbctl/internal/cmd/trace_test.go index 16934834..86ecdfd4 100644 --- a/bydbctl/internal/cmd/trace_test.go +++ b/bydbctl/internal/cmd/trace_test.go @@ -18,24 +18,31 @@ package cmd_test import ( + "fmt" "strings" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/spf13/cobra" "github.com/zenizh/go-capturer" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd" "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" + cases_trace_data "github.com/apache/skywalking-banyandb/test/cases/trace/data" ) var _ = Describe("Trace Schema Operation", func() { var addr string var deferFunc func() var rootCmd *cobra.Command + BeforeEach(func() { _, addr, deferFunc = setup.EmptyStandalone() addr = httpSchema + addr @@ -184,3 +191,122 @@ timestamp_tag_name: timestamp`)) deferFunc() }) }) + +var _ = Describe("Trace Data Query", func() { + var addr, grpcAddr string + var deferFunc func() + var rootCmd *cobra.Command + var now time.Time + var interval time.Duration + BeforeEach(func() { + var err error + now, err = time.ParseInLocation("2006-01-02T15:04:05", "2021-09-01T23:30:00", time.Local) + Expect(err).NotTo(HaveOccurred()) + interval = 500 * time.Millisecond + grpcAddr, addr, deferFunc = setup.EmptyStandalone() + addr = httpSchema + addr + rootCmd = &cobra.Command{Use: "root"} + cmd.RootCmdFlags(rootCmd) + + // Create trace group + rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"}) + createGroup := func() string { + rootCmd.SetIn(strings.NewReader(` +metadata: + name: test-trace-group +catalog: CATALOG_TRACE +resource_opts: + shard_num: 2 + segment_interval: + unit: UNIT_DAY + num: 1 + ttl: + unit: UNIT_DAY + num: 7`)) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("group creation fails: %v", err) + } + }) + } + Eventually(createGroup, flags.EventuallyTimeout).Should(ContainSubstring("group test-trace-group is created")) + + // Create trace schema + rootCmd.SetArgs([]string{"trace", "create", "-a", addr, "-f", "-"}) + createTrace := func() string { + rootCmd.SetIn(strings.NewReader(` + metadata: + name: sw + group: test-trace-group + tags: + - name: trace_id + type: TAG_TYPE_STRING + - name: state + type: TAG_TYPE_INT + - name: service_id + type: TAG_TYPE_STRING + - name: service_instance_id + type: TAG_TYPE_STRING + - name: endpoint_id + type: TAG_TYPE_STRING + - name: duration + type: TAG_TYPE_INT + - name: timestamp + type: TAG_TYPE_TIMESTAMP + trace_id_tag_name: trace_id + timestamp_tag_name: timestamp`)) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("trace creation fails: %v", err) + } + }) + } + Eventually(createTrace, flags.EventuallyTimeout).Should(ContainSubstring("trace test-trace-group.sw is created")) + }) + + It("query trace by trace id", func() { + conn, err := grpclib.NewClient( + grpcAddr, + grpclib.WithTransportCredentials(insecure.NewCredentials()), + ) + Expect(err).NotTo(HaveOccurred()) + + cases_trace_data.Write(conn, "sw", now, interval) + rootCmd.SetArgs([]string{"trace", "query", "-a", addr, "-f", "-"}) + for _, idx := range []int{1, 2, 3, 4, 5} { + issue := func() string { + rootCmd.SetIn(strings.NewReader(fmt.Sprintf(` +name: "sw" +groups: ["test-trace-group"] +tag_projection: ["trace_id"] +criteria: + condition: + name: "trace_id" + op: "BINARY_OP_EQ" + value: + str: + value: "%d"`, idx))) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + Expect(err).NotTo(HaveOccurred()) + }) + } + Eventually(issue, flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:")) + Eventually(func() int { + out := issue() + GinkgoWriter.Println("Query output:", out) + resp := new(tracev1.QueryResponse) + helpers.UnmarshalYAML([]byte(out), resp) + Expect(resp.Spans[0].Tags[0].Key).To(Equal("trace_id")) + Expect(resp.Spans[0].Tags[0].Value.GetStr().Value).To(Equal(fmt.Sprintf("%d", idx))) + return len(resp.Spans) + }, flags.EventuallyTimeout).Should(Equal(1)) + } + }) + + AfterEach(func() { + deferFunc() + }) +}) diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go index e35751eb..f46e591b 100644 --- a/test/cases/trace/data/data.go +++ b/test/cases/trace/data/data.go @@ -190,9 +190,7 @@ func WriteToGroup(conn *grpclib.ClientConn, name, group, fileName string, baseTi } schema := databasev1.NewTraceRegistryServiceClient(conn) resp, err := schema.Get(context.Background(), &databasev1.TraceRegistryServiceGetRequest{Metadata: metadata}) - if err != nil { - return - } + gm.Expect(err).NotTo(gm.HaveOccurred()) metadata = resp.GetTrace().GetMetadata() c := tracev1.NewTraceServiceClient(conn)