This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch ql in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b91e936795438f0f7592ed591c1154a5e900b825 Author: kezhenxu94 <[email protected]> AuthorDate: Fri Sep 19 16:44:32 2025 +0800 stream query ok --- bydbctl/internal/cmd/bydbql.go | 40 ++-- bydbctl/internal/cmd/bydbql_test.go | 438 ++++++++++++++++-------------------- bydbctl/internal/cmd/rest.go | 4 +- bydbctl/internal/cmd/stream_test.go | 1 + pkg/bydbql/bydbql_test.go | 67 ++++-- pkg/bydbql/translator.go | 83 ++++--- 6 files changed, 315 insertions(+), 318 deletions(-) diff --git a/bydbctl/internal/cmd/bydbql.go b/bydbctl/internal/cmd/bydbql.go index 67545eae..387898d7 100644 --- a/bydbctl/internal/cmd/bydbql.go +++ b/bydbctl/internal/cmd/bydbql.go @@ -28,6 +28,7 @@ import ( "github.com/go-resty/resty/v2" "github.com/spf13/cobra" "github.com/spf13/viper" + "sigs.k8s.io/yaml" "github.com/apache/skywalking-banyandb/pkg/bydbql" @@ -36,10 +37,10 @@ import ( // BydbQL endpoint paths const ( - streamQueryPath = "/api/v1/stream/query" - measureQueryPath = "/api/v1/measure/query" - traceQueryPath = "/api/v1/trace/query" - propertyQueryPath = "/api/v1/property/query" + streamQueryPath = "/api/v1/stream/data" + measureQueryPath = "/api/v1/measure/data" + traceQueryPath = "/api/v1/trace/data" + propertyQueryPath = "/api/v1/property/data/query" topnQueryPath = "/api/v1/measure/topn" ) @@ -228,22 +229,21 @@ func executeBydbQLQuery(query string) error { if err != nil { return fmt.Errorf("translation error: %w", err) } - fmt.Printf("Translated YAML:\n%s\n", yamlData) // Convert to JSON for REST API - jsonData, err := json.Marshal(yamlData) - if err != nil { - return fmt.Errorf("JSON conversion error: %w", err) - } - - // Determine endpoint based on query type + // jsonData, err := json.Marshal(yamlData) + // if err != nil { + // return fmt.Errorf("JSON conversion error: %w", err) + // } + // + // // Determine endpoint based on query type endpoint, err := determineEndpoint(parsed) if err != nil { return err } // Execute the query via REST API - return executeRESTQuery(endpoint, jsonData) + return executeRESTQuery(endpoint, yamlData) } // determineEndpoint determines the REST endpoint based on query type @@ -279,18 +279,21 @@ func determineEndpoint(parsed *bydbql.ParsedQuery) (string, error) { } // executeRESTQuery executes the query via REST API -func executeRESTQuery(endpoint string, jsonData []byte) error { - // Create REST request using existing infrastructure - reqBodyData := reqBody{ - data: jsonData, - } - +func executeRESTQuery(endpoint string, jsonData map[string]any) error { // Use existing rest function with appropriate parameters return rest( func() ([]reqBody, error) { + data, err := json.Marshal(jsonData) + if err != nil { + return nil, fmt.Errorf("JSON conversion error: %w", err) + } + + reqBodyData := reqBody{data: data} + return []reqBody{reqBodyData}, nil }, func(req request) (*resty.Response, error) { + fmt.Printf("Executing query at endpoint %s\n", string(req.data)) url := viper.GetString("addr") + endpoint resp, err := req.req. SetHeader("Content-Type", "application/json"). @@ -375,4 +378,3 @@ func printQueryResult(data []byte) error { fmt.Print(string(yamlData)) return nil } - diff --git a/bydbctl/internal/cmd/bydbql_test.go b/bydbctl/internal/cmd/bydbql_test.go index 1b13e792..361d286d 100644 --- a/bydbctl/internal/cmd/bydbql_test.go +++ b/bydbctl/internal/cmd/bydbql_test.go @@ -15,27 +15,208 @@ // specific language governing permissions and limitations // under the License. -package cmd +package cmd_test import ( - "bytes" + "fmt" "strings" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/spf13/cobra" - "github.com/spf13/viper" "github.com/zenizh/go-capturer" - "sigs.k8s.io/yaml" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" - "github.com/apache/skywalking-banyandb/pkg/bydbql" + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd" "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" + cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data" ) -const httpSchema = "http://" +var _ = Describe("BydbQL Stream Query", func() { + var addr, grpcAddr string + var deferFunc func() + var rootCmd *cobra.Command + var now time.Time + var nowStr, endStr string + var interval time.Duration + + BeforeEach(func() { + var err error + now, err = time.ParseInLocation("2006-01-02T15:04:05", "2021-09-01T23:30:00", time.Local) + Expect(err).NotTo(HaveOccurred()) + nowStr = now.Format(time.RFC3339) + interval = 500 * time.Millisecond + endStr = now.Add(1 * time.Hour).Format(time.RFC3339) + grpcAddr, addr, deferFunc = setup.Standalone() + addr = httpSchema + addr + rootCmd = &cobra.Command{Use: "root"} + cmd.RootCmdFlags(rootCmd) + }) + + It("executes stream query with BydbQL", func() { + conn, err := grpclib.NewClient( + grpcAddr, + grpclib.WithTransportCredentials(insecure.NewCredentials()), + ) + Expect(err).NotTo(HaveOccurred()) + + // Write test data + cases_stream_data.Write(conn, "sw", now, interval) + + // Test BydbQL query execution + rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", "-"}) + issue := func() string { + rootCmd.SetIn(strings.NewReader(fmt.Sprintf(` +SELECT trace_id FROM STREAM sw in (default) TIME BETWEEN '%s' AND '%s'`, nowStr, endStr))) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("execution fails:%v", err) + } + }) + } + Eventually(issue, flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:")) + Eventually(func() int { + out := issue() + resp := new(streamv1.QueryResponse) + helpers.UnmarshalYAML([]byte(out), resp) + GinkgoWriter.Println(resp) + return len(resp.Elements) + }, flags.EventuallyTimeout).Should(Equal(5)) + }) + + It("executes stream query with WHERE condition", func() { + conn, err := grpclib.NewClient( + grpcAddr, + grpclib.WithTransportCredentials(insecure.NewCredentials()), + ) + Expect(err).NotTo(HaveOccurred()) + + // Write test data + cases_stream_data.Write(conn, "sw", now, interval) + + // Test BydbQL query with WHERE condition + rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", "-"}) + issue := func() string { + rootCmd.SetIn(strings.NewReader(fmt.Sprintf(` +SELECT trace_id FROM STREAM sw in (default) WHERE trace_id = 'trace-1' TIME BETWEEN '%s' AND '%s' LIMIT 10`, nowStr, endStr))) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("execution fails:%v", err) + } + }) + } + Eventually(issue, flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:")) + Eventually(func() int { + out := issue() + resp := new(streamv1.QueryResponse) + helpers.UnmarshalYAML([]byte(out), resp) + GinkgoWriter.Println(resp) + return len(resp.Elements) + }, time.Microsecond).Should(Equal(1)) + }) + + It("executes stream query with relative time range", func() { + conn, err := grpclib.NewClient( + grpcAddr, + grpclib.WithTransportCredentials(insecure.NewCredentials()), + ) + Expect(err).NotTo(HaveOccurred()) + + // Write test data + cases_stream_data.Write(conn, "sw", now, interval) + + // Test BydbQL query with relative time range + rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", "-"}) + issue := func() string { + rootCmd.SetIn(strings.NewReader(` +SELECT trace_id FROM STREAM sw TIME > '-30m' LIMIT 10`)) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("execution fails:%v", err) + } + }) + } + Eventually(issue, flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:")) + Eventually(func() int { + out := issue() + resp := new(streamv1.QueryResponse) + helpers.UnmarshalYAML([]byte(out), resp) + GinkgoWriter.Println(resp) + return len(resp.Elements) + }, flags.EventuallyTimeout).Should(Equal(5)) + }) + + It("executes stream query with multiple projections", func() { + conn, err := grpclib.NewClient( + grpcAddr, + grpclib.WithTransportCredentials(insecure.NewCredentials()), + ) + Expect(err).NotTo(HaveOccurred()) + + // Write test data + cases_stream_data.Write(conn, "sw", now, interval) + + // Test BydbQL query with multiple projections + rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", "-"}) + issue := func() string { + rootCmd.SetIn(strings.NewReader(fmt.Sprintf(` +SELECT trace_id, service_id FROM STREAM sw TIME BETWEEN '%s' AND '%s' LIMIT 10`, nowStr, endStr))) + return capturer.CaptureStdout(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("execution fails:%v", err) + } + }) + } + Eventually(issue, flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:")) + Eventually(func() int { + out := issue() + resp := new(streamv1.QueryResponse) + helpers.UnmarshalYAML([]byte(out), resp) + GinkgoWriter.Println(resp) + return len(resp.Elements) + }, flags.EventuallyTimeout).Should(Equal(5)) + }) + + It("handles invalid BydbQL syntax", func() { + conn, err := grpclib.NewClient( + grpcAddr, + grpclib.WithTransportCredentials(insecure.NewCredentials()), + ) + Expect(err).NotTo(HaveOccurred()) + + // Write test data + cases_stream_data.Write(conn, "sw", now, interval) + + // Test invalid BydbQL query + rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", "-"}) + issue := func() string { + rootCmd.SetIn(strings.NewReader(` +SELECT FROM STREAM sw`)) + return capturer.CaptureStderr(func() { + err := rootCmd.Execute() + if err != nil { + GinkgoWriter.Printf("execution fails:%v", err) + } + }) + } + Eventually(issue, flags.EventuallyTimeout).Should(ContainSubstring("parsing errors")) + }) + + AfterEach(func() { + deferFunc() + }) +}) -var _ = Describe("BydbQL Command", func() { +var _ = Describe("BydbQL Command Structure", func() { var addr string var deferFunc func() var rootCmd *cobra.Command @@ -44,14 +225,9 @@ var _ = Describe("BydbQL Command", func() { _, addr, deferFunc = setup.EmptyStandalone() addr = httpSchema + addr - // Reset viper for clean test state - viper.Reset() - viper.Set("addr", addr) - viper.Set("group", "default") - // Create root command with all subcommands rootCmd = &cobra.Command{Use: "root"} - RootCmdFlags(rootCmd) + cmd.RootCmdFlags(rootCmd) }) AfterEach(func() { @@ -82,85 +258,10 @@ var _ = Describe("BydbQL Command", func() { } Expect(subCommandNames).To(ContainElements("interactive", "query")) }) -}) - -var _ = Describe("Endpoint Determination", func() { - DescribeTable("determines correct endpoint for different query types", - func(query, expectedEndpoint string) { - parsed, errors := bydbql.ParseQuery(query) - Expect(errors).To(BeEmpty()) - - endpoint, err := determineEndpoint(parsed) - Expect(err).NotTo(HaveOccurred()) - Expect(endpoint).To(Equal(expectedEndpoint)) - }, - Entry("stream query", "SELECT * FROM STREAM sw", streamQueryPath), - Entry("measure query", "SELECT * FROM MEASURE metrics", measureQueryPath), - Entry("trace query", "SELECT * FROM TRACE traces", traceQueryPath), - Entry("property query", "SELECT * FROM PROPERTY props", propertyQueryPath), - Entry("top-n query", "SHOW TOP 10 FROM MEASURE metrics", topnQueryPath), - ) -}) - -var _ = Describe("Query Execution Parse Errors", func() { - DescribeTable("handles parsing errors correctly", - func(query string) { - err := executeBydbQLQuery(query) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("parsing errors")) - }, - Entry("incomplete SELECT", "SELECT"), - Entry("missing resource", "SELECT * FROM"), - Entry("missing N in TOP", "SHOW TOP FROM MEASURE metrics"), - ) -}) - -var _ = Describe("Valid Query Parsing", func() { - DescribeTable("parses valid queries correctly", - func(query string) { - // Parse and validate query structure - parsed, errors := bydbql.ParseQuery(query) - Expect(errors).To(BeEmpty()) - Expect(parsed).NotTo(BeNil()) - - // Validate endpoint determination - _, err := determineEndpoint(parsed) - Expect(err).NotTo(HaveOccurred()) - }, - Entry("stream query", "SELECT * FROM STREAM sw"), - Entry("measure query with aggregation", "SELECT region, SUM(latency) FROM MEASURE metrics GROUP BY region"), - Entry("top-n query", "SHOW TOP 10 FROM MEASURE service_latency ORDER BY value DESC"), - Entry("trace query with condition", "SELECT * FROM TRACE traces WHERE status = 'error'"), - Entry("property query with condition", "SELECT ip, owner FROM PROPERTY metadata WHERE datacenter = 'dc-1'"), - ) -}) - -var _ = Describe("Help Command", func() { - var addr string - var deferFunc func() - var rootCmd *cobra.Command - - BeforeEach(func() { - _, addr, deferFunc = setup.EmptyStandalone() - addr = httpSchema + addr - - // Reset viper for clean test state - viper.Reset() - viper.Set("addr", addr) - viper.Set("group", "default") - - // Create root command with all subcommands - rootCmd = &cobra.Command{Use: "root"} - RootCmdFlags(rootCmd) - }) - - AfterEach(func() { - deferFunc() - }) It("displays help information correctly", func() { // Capture help output - var output bytes.Buffer + var output strings.Builder rootCmd.SetOut(&output) // Execute help for bydbql command @@ -184,162 +285,3 @@ var _ = Describe("Help Command", func() { } }) }) - -var _ = Describe("BydbQL Integration", func() { - DescribeTable("full pipeline processing", - func(query string, resourceType bydbql.ResourceType) { - // Step 1: Parse query - parsed, errors := bydbql.ParseQuery(query) - Expect(errors).To(BeEmpty()) - Expect(parsed).NotTo(BeNil()) - - // Step 2: Validate resource type detection - if parsed.ResourceType != resourceType { - // For auto-detected resource types, this might be expected - GinkgoWriter.Printf("Resource type: expected %s, got %s (may be auto-detected)", - resourceType.String(), parsed.ResourceType.String()) - } - - // Step 3: Translate to YAML - context := &bydbql.QueryContext{ - DefaultGroup: "default", - } - translator := bydbql.NewTranslator(context) - yamlData, err := translator.TranslateToYAML(parsed) - Expect(err).NotTo(HaveOccurred()) - Expect(yamlData).NotTo(BeEmpty()) - - // Step 4: Determine correct endpoint - endpoint, err := determineEndpoint(parsed) - Expect(err).NotTo(HaveOccurred()) - - // Step 5: Validate endpoint matches expected resource type - expectedEndpoints := map[bydbql.ResourceType]string{ - bydbql.ResourceTypeStream: streamQueryPath, - bydbql.ResourceTypeMeasure: measureQueryPath, - bydbql.ResourceTypeTrace: traceQueryPath, - bydbql.ResourceTypeProperty: propertyQueryPath, - } - - // For Top-N queries, always use measure endpoint - if _, isTopN := parsed.Statement.(*bydbql.TopNStatement); isTopN { - Expect(endpoint).To(Equal(topnQueryPath)) - } else if expectedEndpoint, ok := expectedEndpoints[resourceType]; ok { - Expect(endpoint).To(Equal(expectedEndpoint)) - } - - // Step 6: Validate YAML structure - var yamlMap map[string]interface{} - err = yaml.Unmarshal(yamlData, &yamlMap) - Expect(err).NotTo(HaveOccurred()) - - // Basic validation: should have name or resource identifier - Expect(yamlMap["Name"] != nil || yamlMap["topN"] != nil).To(BeTrue()) - - GinkgoWriter.Printf("Successfully processed query: %s", query) - GinkgoWriter.Printf("Generated YAML length: %d bytes", len(yamlData)) - GinkgoWriter.Printf("Target endpoint: %s", endpoint) - }, - Entry("stream query full pipeline", - "SELECT trace_id, service_id FROM STREAM sw WHERE service_id = 'webapp' TIME > '-30m' LIMIT 100", - bydbql.ResourceTypeStream), - Entry("measure query full pipeline", - "SELECT region, AVG(latency) FROM MEASURE service_metrics WHERE service = 'auth' GROUP BY region", - bydbql.ResourceTypeMeasure), - Entry("topn query full pipeline", - "SHOW TOP 5 FROM MEASURE error_count WHERE status_code = '500'", - bydbql.ResourceTypeMeasure), - Entry("trace query full pipeline", - "SELECT * FROM TRACE app_traces WHERE operation_name = 'GET /api/users' TIME BETWEEN '-1h' AND 'now'", - bydbql.ResourceTypeTrace), - Entry("property query full pipeline", - "SELECT ip, region FROM PROPERTY server_info WHERE ID = 'server-1'", - bydbql.ResourceTypeProperty), - ) -}) - -var _ = Describe("Command Line Flags", func() { - var addr string - var deferFunc func() - var rootCmd *cobra.Command - - BeforeEach(func() { - _, addr, deferFunc = setup.EmptyStandalone() - addr = httpSchema + addr - - // Reset viper for clean test state - viper.Reset() - viper.Set("addr", addr) - viper.Set("group", "default") - - // Create root command with all subcommands - rootCmd = &cobra.Command{Use: "root"} - RootCmdFlags(rootCmd) - }) - - AfterEach(func() { - deferFunc() - }) - - DescribeTable("handles command line flags correctly", - func(args []string, expectError bool) { - // Prepend "bydbql" to the args since we're testing the bydbql subcommand - fullArgs := append([]string{"bydbql"}, args...) - rootCmd.SetArgs(fullArgs) - - // We're testing command structure, not execution - // So we expect parsing to succeed even if execution might fail - err := rootCmd.ParseFlags(fullArgs) - - if expectError { - Expect(err).To(HaveOccurred()) - } else { - Expect(err).NotTo(HaveOccurred()) - } - }, - Entry("valid query command with file", []string{"query", "-f", "test.sql"}, false), - Entry("valid interactive command", []string{"interactive"}, false), - Entry("alias i for interactive", []string{"i"}, false), - ) -}) - -var _ = Describe("BydbQL Query Execution", func() { - var addr string - var deferFunc func() - var rootCmd *cobra.Command - - BeforeEach(func() { - _, addr, deferFunc = setup.EmptyStandalone() - addr = httpSchema + addr - - // Reset viper for clean test state - viper.Reset() - viper.Set("addr", addr) - viper.Set("group", "default") - - // Create root command with all subcommands - rootCmd = &cobra.Command{Use: "root"} - RootCmdFlags(rootCmd) - }) - - AfterEach(func() { - deferFunc() - }) - - It("executes simple query successfully", func() { - // Test a simple query execution - rootCmd.SetArgs([]string{"bydbql", "query", "-f", "-"}) - rootCmd.SetIn(strings.NewReader("SELECT * FROM STREAM sw LIMIT 1")) - - executeQuery := func() string { - return capturer.CaptureStdout(func() { - err := rootCmd.Execute() - if err != nil { - GinkgoWriter.Printf("execution fails: %v", err) - } - }) - } - - Eventually(executeQuery, flags.EventuallyTimeout).ShouldNot(ContainSubstring("error")) - }) -}) diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go index 3608128f..10af7be7 100644 --- a/bydbctl/internal/cmd/rest.go +++ b/bydbctl/internal/cmd/rest.go @@ -203,8 +203,8 @@ func parseTimeRangeFromFlagAndYAML(reader io.Reader) (requests []reqBody, err er } startTS = endTS.Add(-timeRange) } - s := startTS.Format(time.RFC3339) - e := endTS.Format(time.RFC3339) + s := startTS.Format("2006-01-02T15:04:05-07:00") + e := endTS.Format("2006-01-02T15:04:05-07:00") var rawRequests []reqBody if rawRequests, err = parseNameAndGroupFromYAML(reader); err != nil { return nil, err diff --git a/bydbctl/internal/cmd/stream_test.go b/bydbctl/internal/cmd/stream_test.go index 4423e134..687e396b 100644 --- a/bydbctl/internal/cmd/stream_test.go +++ b/bydbctl/internal/cmd/stream_test.go @@ -237,6 +237,7 @@ projection: Eventually(issue, flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:")) Eventually(func() int { out := issue() + GinkgoWriter.Println(out) resp := new(streamv1.QueryResponse) helpers.UnmarshalYAML([]byte(out), resp) GinkgoWriter.Println(resp) diff --git a/pkg/bydbql/bydbql_test.go b/pkg/bydbql/bydbql_test.go index 57e8d8f8..d993b085 100644 --- a/pkg/bydbql/bydbql_test.go +++ b/pkg/bydbql/bydbql_test.go @@ -202,31 +202,31 @@ var _ = Describe("Translator", func() { Entry("simple stream query", "SELECT * FROM STREAM sw", func(data map[string]any) bool { - return data["Name"] == "sw" + return data["name"] == "sw" }), Entry("measure query with aggregation", "SELECT region, SUM(latency) FROM MEASURE metrics GROUP BY region", func(data map[string]any) bool { - agg, ok := data["Agg"].(map[string]any) - return ok && agg["Function"] == "SUM" && agg["FieldName"] == "latency" + agg, ok := data["agg"].(map[string]any) + return ok && agg["function"] == "SUM" && agg["field_name"] == "latency" }), Entry("time range query", "SELECT * FROM STREAM sw TIME BETWEEN '2023-01-01T00:00:00Z' AND '2023-01-02T00:00:00Z'", func(data map[string]any) bool { - timeRange, ok := data["TimeRange"].(map[string]any) - return ok && timeRange["Begin"] == "2023-01-01T00:00:00Z" && - timeRange["End"] == "2023-01-02T00:00:00Z" + timeRange, ok := data["timeRange"].(map[string]any) + return ok && timeRange["begin"] == "2023-01-01T00:00:00Z" && + timeRange["end"] == "2023-01-02T00:00:00Z" }), Entry("relative time query", "SELECT * FROM STREAM sw TIME > '-30m'", func(data map[string]any) bool { - timeRange, ok := data["TimeRange"].(map[string]any) - return ok && timeRange["Begin"] != nil && timeRange["End"] != nil + timeRange, ok := data["timeRange"].(map[string]any) + return ok && timeRange["begin"] != nil && timeRange["end"] != nil }), Entry("WHERE clause with criteria", "SELECT * FROM STREAM sw WHERE service_id = 'webapp'", func(data map[string]any) bool { - criteria, ok := data["Criteria"].([]any) + criteria, ok := data["criteria"].([]any) if !ok || len(criteria) == 0 { return false } @@ -236,20 +236,49 @@ var _ = Describe("Translator", func() { Entry("TOP N query", "SHOW TOP 10 FROM MEASURE service_latency ORDER BY value DESC", func(data map[string]any) bool { - topN := data["TopN"] - fieldValueSort := data["FieldValueSort"] - return topN == float64(10) && fieldValueSort == "DESC" + topN := data["top_n"] + fieldValueSort := data["field_value_sort"] + return topN == 10 && fieldValueSort == "DESC" }), Entry("property query with IDs", "SELECT * FROM PROPERTY metadata WHERE ID = 'id1' OR ID = 'id2'", func(data map[string]any) bool { - criteria, ok := data["Criteria"].([]any) + criteria, ok := data["criteria"].([]any) return ok && len(criteria) == 2 }), Entry("query trace enabled", "SELECT * FROM STREAM sw WITH QUERY_TRACE", func(data map[string]any) bool { - return data["Trace"] == true + return data["trace"] == true + }), + Entry("stream query with projection tagFamilies format", + "SELECT trace_id FROM STREAM sw", + func(data map[string]any) bool { + // Verify that projection is translated to tagFamilies format with snake_case + projection, ok := data["projection"].(map[string]any) + if !ok { + return false + } + tagFamiliesRaw := projection["tagFamilies"] + tagFamilies, ok := tagFamiliesRaw.([]interface{}) + if !ok || len(tagFamilies) == 0 { + return false + } + tagFamily, ok := tagFamilies[0].(map[string]interface{}) + if !ok { + return false + } + name, ok := tagFamily["name"].(string) + if !ok || name != "searchable" { + return false + } + tagsRaw := tagFamily["tags"] + tags, ok := tagsRaw.([]interface{}) + if !ok || len(tags) == 0 { + return false + } + tag, ok := tags[0].(string) + return ok && tag == "trace_id" }), ) }) @@ -370,15 +399,15 @@ var _ = Describe("Time Format Parsing", func() { Entry("absolute time range", "BETWEEN '2023-01-01T10:00:00Z' AND '2023-01-01T11:00:00Z'", func(data map[string]any) bool { - timeRange, ok := data["TimeRange"].(map[string]any) - return ok && timeRange["Begin"] == "2023-01-01T10:00:00Z" && - timeRange["End"] == "2023-01-01T11:00:00Z" + timeRange, ok := data["timeRange"].(map[string]any) + return ok && timeRange["begin"] == "2023-01-01T10:00:00Z" && + timeRange["end"] == "2023-01-01T11:00:00Z" }), Entry("relative time condition", "> '-30m'", func(data map[string]any) bool { - timeRange, ok := data["TimeRange"].(map[string]any) - return ok && timeRange["Begin"] != nil && timeRange["End"] != nil + timeRange, ok := data["timeRange"].(map[string]any) + return ok && timeRange["begin"] != nil && timeRange["end"] != nil }), ) }) diff --git a/pkg/bydbql/translator.go b/pkg/bydbql/translator.go index ca41b8fd..df25f9eb 100644 --- a/pkg/bydbql/translator.go +++ b/pkg/bydbql/translator.go @@ -24,38 +24,38 @@ import ( "time" str2duration "github.com/xhit/go-str2duration/v2" - "sigs.k8s.io/yaml" + "gopkg.in/yaml.v3" ) // QueryYAML represents the YAML structure for BanyanDB queries type QueryYAML struct { // Common fields - Name string `yaml:"name,omitempty"` - Groups []string `yaml:"groups,omitempty"` - TimeRange *TimeRangeYAML `yaml:"timeRange,omitempty"` - Criteria []map[string]interface{} `yaml:"criteria,omitempty"` - Limit *int `yaml:"limit,omitempty"` - Offset *int `yaml:"offset,omitempty"` - Trace bool `yaml:"trace,omitempty"` + Name string `yaml:"name,omitempty" json:"name,omitempty"` + Groups []string `yaml:"groups,omitempty" json:"groups,omitempty"` + TimeRange *TimeRangeYAML `yaml:"timeRange,omitempty" json:"timeRange,omitempty"` + Criteria []map[string]interface{} `yaml:"criteria,omitempty" json:"criteria,omitempty"` + Limit *int `yaml:"limit,omitempty" json:"limit,omitempty"` + Offset *int `yaml:"offset,omitempty" json:"offset,omitempty"` + Trace bool `yaml:"trace,omitempty" json:"trace,omitempty"` // Stream/Trace specific - Projection []string `yaml:"projection,omitempty"` - OrderBy *OrderByYAML `yaml:"orderBy,omitempty"` + Projection interface{} `yaml:"projection,omitempty" json:"projection,omitempty"` + OrderBy *OrderByYAML `yaml:"order_by,omitempty" json:"order_by,omitempty"` // Measure specific - TagProjection []string `yaml:"tagProjection,omitempty"` - FieldProjection []string `yaml:"fieldProjection,omitempty"` - GroupBy *GroupByYAML `yaml:"groupBy,omitempty"` - Agg *AggregationYAML `yaml:"agg,omitempty"` - Top *TopYAML `yaml:"top,omitempty"` + TagProjection []string `yaml:"tag_projection,omitempty" json:"tag_projection,omitempty"` + FieldProjection []string `yaml:"field_projection,omitempty" json:"field_projection,omitempty"` + GroupBy *GroupByYAML `yaml:"group_by,omitempty" json:"group_by,omitempty"` + Agg *AggregationYAML `yaml:"agg,omitempty" json:"agg,omitempty"` + Top *TopYAML `yaml:"top,omitempty" json:"top,omitempty"` // Property specific - IDs []string `yaml:"ids,omitempty"` + IDs []string `yaml:"ids,omitempty" json:"ids,omitempty"` // Top-N specific (for separate endpoint) - TopN int `yaml:"topN,omitempty"` - FieldValueSort string `yaml:"fieldValueSort,omitempty"` - Conditions []map[string]interface{} `yaml:"conditions,omitempty"` + TopN int `yaml:"top_n,omitempty" json:"top_n,omitempty"` + FieldValueSort string `yaml:"field_value_sort,omitempty" json:"field_value_sort,omitempty"` + Conditions []map[string]interface{} `yaml:"conditions,omitempty" json:"conditions,omitempty"` } // TimeRangeYAML represents time range in YAML @@ -66,26 +66,37 @@ type TimeRangeYAML struct { // OrderByYAML represents ORDER BY clause in YAML type OrderByYAML struct { - IndexRuleName string `yaml:"indexRuleName"` + IndexRuleName string `yaml:"index_rule_name"` Sort string `yaml:"sort"` // ASC or DESC } // GroupByYAML represents GROUP BY clause in YAML type GroupByYAML struct { - TagProjection []string `yaml:"tagProjection"` + TagProjection []string `yaml:"tag_projection"` +} + +// TagFamilyYAML represents a tag family in projection +type TagFamilyYAML struct { + Name string `yaml:"name"` + Tags []string `yaml:"tags"` +} + +// ProjectionYAML represents the projection structure for stream/trace queries +type ProjectionYAML struct { + TagFamilies []TagFamilyYAML `yaml:"tagFamilies"` } // AggregationYAML represents aggregation function in YAML type AggregationYAML struct { - Function string `yaml:"function"` - FieldName string `yaml:"fieldName"` + Function string `yaml:"function"` + FieldName string `yaml:"field_name"` } // TopYAML represents TOP N clause in YAML type TopYAML struct { - Number int `yaml:"number"` - FieldName string `yaml:"fieldName"` - FieldValueSort string `yaml:"fieldValueSort"` + Number int `yaml:"number"` + FieldName string `yaml:"field_name"` + FieldValueSort string `yaml:"field_value_sort"` } // Translator converts parsed BydbQL to YAML format @@ -209,10 +220,20 @@ func (t *Translator) translateStreamOrTraceQuery(stmt *SelectStatement, yamlQuer // SELECT * - no specific projection needed } else if stmt.Projection.Empty { // SELECT () - empty projection for traces - yamlQuery.Projection = []string{} + yamlQuery.Projection = &ProjectionYAML{ + TagFamilies: []TagFamilyYAML{}, + } } else if len(stmt.Projection.Columns) > 0 { + // Group columns by tag family (default to "searchable" for now) + tagFamily := TagFamilyYAML{ + Name: "searchable", + Tags: make([]string, 0, len(stmt.Projection.Columns)), + } for _, col := range stmt.Projection.Columns { - yamlQuery.Projection = append(yamlQuery.Projection, col.Name) + tagFamily.Tags = append(tagFamily.Tags, col.Name) + } + yamlQuery.Projection = &ProjectionYAML{ + TagFamilies: []TagFamilyYAML{tagFamily}, } } } @@ -301,9 +322,11 @@ func (t *Translator) translateMeasureQuery(stmt *SelectStatement, yamlQuery *Que func (t *Translator) translatePropertyQuery(stmt *SelectStatement, yamlQuery *QueryYAML) (*QueryYAML, error) { // Translate projection if stmt.Projection != nil && !stmt.Projection.All && len(stmt.Projection.Columns) > 0 { + projection := make([]string, 0, len(stmt.Projection.Columns)) for _, col := range stmt.Projection.Columns { - yamlQuery.Projection = append(yamlQuery.Projection, col.Name) + projection = append(projection, col.Name) } + yamlQuery.Projection = projection } // Extract IDs from WHERE clause if present @@ -588,4 +611,4 @@ func TranslateQuery(query string, context *QueryContext) ([]byte, []string, erro } return yamlBytes, nil, nil -} \ No newline at end of file +}
