This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch ql
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b91e936795438f0f7592ed591c1154a5e900b825
Author: kezhenxu94 <[email protected]>
AuthorDate: Fri Sep 19 16:44:32 2025 +0800

    stream query ok
---
 bydbctl/internal/cmd/bydbql.go      |  40 ++--
 bydbctl/internal/cmd/bydbql_test.go | 438 ++++++++++++++++--------------------
 bydbctl/internal/cmd/rest.go        |   4 +-
 bydbctl/internal/cmd/stream_test.go |   1 +
 pkg/bydbql/bydbql_test.go           |  67 ++++--
 pkg/bydbql/translator.go            |  83 ++++---
 6 files changed, 315 insertions(+), 318 deletions(-)

diff --git a/bydbctl/internal/cmd/bydbql.go b/bydbctl/internal/cmd/bydbql.go
index 67545eae..387898d7 100644
--- a/bydbctl/internal/cmd/bydbql.go
+++ b/bydbctl/internal/cmd/bydbql.go
@@ -28,6 +28,7 @@ import (
        "github.com/go-resty/resty/v2"
        "github.com/spf13/cobra"
        "github.com/spf13/viper"
+
        "sigs.k8s.io/yaml"
 
        "github.com/apache/skywalking-banyandb/pkg/bydbql"
@@ -36,10 +37,10 @@ import (
 
 // BydbQL endpoint paths
 const (
-       streamQueryPath   = "/api/v1/stream/query"
-       measureQueryPath  = "/api/v1/measure/query"
-       traceQueryPath    = "/api/v1/trace/query"
-       propertyQueryPath = "/api/v1/property/query"
+       streamQueryPath   = "/api/v1/stream/data"
+       measureQueryPath  = "/api/v1/measure/data"
+       traceQueryPath    = "/api/v1/trace/data"
+       propertyQueryPath = "/api/v1/property/data/query"
        topnQueryPath     = "/api/v1/measure/topn"
 )
 
@@ -228,22 +229,21 @@ func executeBydbQLQuery(query string) error {
        if err != nil {
                return fmt.Errorf("translation error: %w", err)
        }
-       fmt.Printf("Translated YAML:\n%s\n", yamlData)
 
        // Convert to JSON for REST API
-       jsonData, err := json.Marshal(yamlData)
-       if err != nil {
-               return fmt.Errorf("JSON conversion error: %w", err)
-       }
-
-       // Determine endpoint based on query type
+       // jsonData, err := json.Marshal(yamlData)
+       // if err != nil {
+       //      return fmt.Errorf("JSON conversion error: %w", err)
+       // }
+       //
+       // // Determine endpoint based on query type
        endpoint, err := determineEndpoint(parsed)
        if err != nil {
                return err
        }
 
        // Execute the query via REST API
-       return executeRESTQuery(endpoint, jsonData)
+       return executeRESTQuery(endpoint, yamlData)
 }
 
 // determineEndpoint determines the REST endpoint based on query type
@@ -279,18 +279,21 @@ func determineEndpoint(parsed *bydbql.ParsedQuery) 
(string, error) {
 }
 
 // executeRESTQuery executes the query via REST API
-func executeRESTQuery(endpoint string, jsonData []byte) error {
-       // Create REST request using existing infrastructure
-       reqBodyData := reqBody{
-               data: jsonData,
-       }
-
+func executeRESTQuery(endpoint string, jsonData map[string]any) error {
        // Use existing rest function with appropriate parameters
        return rest(
                func() ([]reqBody, error) {
+                       data, err := json.Marshal(jsonData)
+                       if err != nil {
+                               return nil, fmt.Errorf("JSON conversion error: 
%w", err)
+                       }
+
+                       reqBodyData := reqBody{data: data}
+
                        return []reqBody{reqBodyData}, nil
                },
                func(req request) (*resty.Response, error) {
+                       fmt.Printf("Executing query at endpoint %s\n", 
string(req.data))
                        url := viper.GetString("addr") + endpoint
                        resp, err := req.req.
                                SetHeader("Content-Type", "application/json").
@@ -375,4 +378,3 @@ func printQueryResult(data []byte) error {
        fmt.Print(string(yamlData))
        return nil
 }
-
diff --git a/bydbctl/internal/cmd/bydbql_test.go 
b/bydbctl/internal/cmd/bydbql_test.go
index 1b13e792..361d286d 100644
--- a/bydbctl/internal/cmd/bydbql_test.go
+++ b/bydbctl/internal/cmd/bydbql_test.go
@@ -15,27 +15,208 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package cmd
+package cmd_test
 
 import (
-       "bytes"
+       "fmt"
        "strings"
+       "time"
 
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
        "github.com/spf13/cobra"
-       "github.com/spf13/viper"
        "github.com/zenizh/go-capturer"
-       "sigs.k8s.io/yaml"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
 
-       "github.com/apache/skywalking-banyandb/pkg/bydbql"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       cases_stream_data 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
 )
 
-const httpSchema = "http://";
+var _ = Describe("BydbQL Stream Query", func() {
+       var addr, grpcAddr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       var now time.Time
+       var nowStr, endStr string
+       var interval time.Duration
+
+       BeforeEach(func() {
+               var err error
+               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
+               Expect(err).NotTo(HaveOccurred())
+               nowStr = now.Format(time.RFC3339)
+               interval = 500 * time.Millisecond
+               endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
+               grpcAddr, addr, deferFunc = setup.Standalone()
+               addr = httpSchema + addr
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+       })
+
+       It("executes stream query with BydbQL", func() {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+
+               // Write test data
+               cases_stream_data.Write(conn, "sw", now, interval)
+
+               // Test BydbQL query execution
+               rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", 
"-"})
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+SELECT trace_id FROM STREAM sw in (default) TIME BETWEEN '%s' AND '%s'`, 
nowStr, endStr)))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("execution 
fails:%v", err)
+                               }
+                       })
+               }
+               Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       resp := new(streamv1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Elements)
+               }, flags.EventuallyTimeout).Should(Equal(5))
+       })
+
+       It("executes stream query with WHERE condition", func() {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+
+               // Write test data
+               cases_stream_data.Write(conn, "sw", now, interval)
+
+               // Test BydbQL query with WHERE condition
+               rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", 
"-"})
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+SELECT trace_id FROM STREAM sw in (default) WHERE trace_id = 'trace-1' TIME 
BETWEEN '%s' AND '%s' LIMIT 10`, nowStr, endStr)))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("execution 
fails:%v", err)
+                               }
+                       })
+               }
+               Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       resp := new(streamv1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Elements)
+               }, time.Microsecond).Should(Equal(1))
+       })
+
+       It("executes stream query with relative time range", func() {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+
+               // Write test data
+               cases_stream_data.Write(conn, "sw", now, interval)
+
+               // Test BydbQL query with relative time range
+               rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", 
"-"})
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+SELECT trace_id FROM STREAM sw TIME > '-30m' LIMIT 10`))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("execution 
fails:%v", err)
+                               }
+                       })
+               }
+               Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       resp := new(streamv1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Elements)
+               }, flags.EventuallyTimeout).Should(Equal(5))
+       })
+
+       It("executes stream query with multiple projections", func() {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+
+               // Write test data
+               cases_stream_data.Write(conn, "sw", now, interval)
+
+               // Test BydbQL query with multiple projections
+               rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", 
"-"})
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+SELECT trace_id, service_id FROM STREAM sw TIME BETWEEN '%s' AND '%s' LIMIT 
10`, nowStr, endStr)))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("execution 
fails:%v", err)
+                               }
+                       })
+               }
+               Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       resp := new(streamv1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Elements)
+               }, flags.EventuallyTimeout).Should(Equal(5))
+       })
+
+       It("handles invalid BydbQL syntax", func() {
+               conn, err := grpclib.NewClient(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+
+               // Write test data
+               cases_stream_data.Write(conn, "sw", now, interval)
+
+               // Test invalid BydbQL query
+               rootCmd.SetArgs([]string{"bydbql", "query", "-a", addr, "-f", 
"-"})
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+SELECT FROM STREAM sw`))
+                       return capturer.CaptureStderr(func() {
+                               err := rootCmd.Execute()
+                               if err != nil {
+                                       GinkgoWriter.Printf("execution 
fails:%v", err)
+                               }
+                       })
+               }
+               Eventually(issue, 
flags.EventuallyTimeout).Should(ContainSubstring("parsing errors"))
+       })
+
+       AfterEach(func() {
+               deferFunc()
+       })
+})
 
-var _ = Describe("BydbQL Command", func() {
+var _ = Describe("BydbQL Command Structure", func() {
        var addr string
        var deferFunc func()
        var rootCmd *cobra.Command
@@ -44,14 +225,9 @@ var _ = Describe("BydbQL Command", func() {
                _, addr, deferFunc = setup.EmptyStandalone()
                addr = httpSchema + addr
 
-               // Reset viper for clean test state
-               viper.Reset()
-               viper.Set("addr", addr)
-               viper.Set("group", "default")
-
                // Create root command with all subcommands
                rootCmd = &cobra.Command{Use: "root"}
-               RootCmdFlags(rootCmd)
+               cmd.RootCmdFlags(rootCmd)
        })
 
        AfterEach(func() {
@@ -82,85 +258,10 @@ var _ = Describe("BydbQL Command", func() {
                }
                Expect(subCommandNames).To(ContainElements("interactive", 
"query"))
        })
-})
-
-var _ = Describe("Endpoint Determination", func() {
-       DescribeTable("determines correct endpoint for different query types",
-               func(query, expectedEndpoint string) {
-                       parsed, errors := bydbql.ParseQuery(query)
-                       Expect(errors).To(BeEmpty())
-
-                       endpoint, err := determineEndpoint(parsed)
-                       Expect(err).NotTo(HaveOccurred())
-                       Expect(endpoint).To(Equal(expectedEndpoint))
-               },
-               Entry("stream query", "SELECT * FROM STREAM sw", 
streamQueryPath),
-               Entry("measure query", "SELECT * FROM MEASURE metrics", 
measureQueryPath),
-               Entry("trace query", "SELECT * FROM TRACE traces", 
traceQueryPath),
-               Entry("property query", "SELECT * FROM PROPERTY props", 
propertyQueryPath),
-               Entry("top-n query", "SHOW TOP 10 FROM MEASURE metrics", 
topnQueryPath),
-       )
-})
-
-var _ = Describe("Query Execution Parse Errors", func() {
-       DescribeTable("handles parsing errors correctly",
-               func(query string) {
-                       err := executeBydbQLQuery(query)
-                       Expect(err).To(HaveOccurred())
-                       Expect(err.Error()).To(ContainSubstring("parsing 
errors"))
-               },
-               Entry("incomplete SELECT", "SELECT"),
-               Entry("missing resource", "SELECT * FROM"),
-               Entry("missing N in TOP", "SHOW TOP FROM MEASURE metrics"),
-       )
-})
-
-var _ = Describe("Valid Query Parsing", func() {
-       DescribeTable("parses valid queries correctly",
-               func(query string) {
-                       // Parse and validate query structure
-                       parsed, errors := bydbql.ParseQuery(query)
-                       Expect(errors).To(BeEmpty())
-                       Expect(parsed).NotTo(BeNil())
-
-                       // Validate endpoint determination
-                       _, err := determineEndpoint(parsed)
-                       Expect(err).NotTo(HaveOccurred())
-               },
-               Entry("stream query", "SELECT * FROM STREAM sw"),
-               Entry("measure query with aggregation", "SELECT region, 
SUM(latency) FROM MEASURE metrics GROUP BY region"),
-               Entry("top-n query", "SHOW TOP 10 FROM MEASURE service_latency 
ORDER BY value DESC"),
-               Entry("trace query with condition", "SELECT * FROM TRACE traces 
WHERE status = 'error'"),
-               Entry("property query with condition", "SELECT ip, owner FROM 
PROPERTY metadata WHERE datacenter = 'dc-1'"),
-       )
-})
-
-var _ = Describe("Help Command", func() {
-       var addr string
-       var deferFunc func()
-       var rootCmd *cobra.Command
-
-       BeforeEach(func() {
-               _, addr, deferFunc = setup.EmptyStandalone()
-               addr = httpSchema + addr
-
-               // Reset viper for clean test state
-               viper.Reset()
-               viper.Set("addr", addr)
-               viper.Set("group", "default")
-
-               // Create root command with all subcommands
-               rootCmd = &cobra.Command{Use: "root"}
-               RootCmdFlags(rootCmd)
-       })
-
-       AfterEach(func() {
-               deferFunc()
-       })
 
        It("displays help information correctly", func() {
                // Capture help output
-               var output bytes.Buffer
+               var output strings.Builder
                rootCmd.SetOut(&output)
 
                // Execute help for bydbql command
@@ -184,162 +285,3 @@ var _ = Describe("Help Command", func() {
                }
        })
 })
-
-var _ = Describe("BydbQL Integration", func() {
-       DescribeTable("full pipeline processing",
-               func(query string, resourceType bydbql.ResourceType) {
-                       // Step 1: Parse query
-                       parsed, errors := bydbql.ParseQuery(query)
-                       Expect(errors).To(BeEmpty())
-                       Expect(parsed).NotTo(BeNil())
-
-                       // Step 2: Validate resource type detection
-                       if parsed.ResourceType != resourceType {
-                               // For auto-detected resource types, this might 
be expected
-                               GinkgoWriter.Printf("Resource type: expected 
%s, got %s (may be auto-detected)",
-                                       resourceType.String(), 
parsed.ResourceType.String())
-                       }
-
-                       // Step 3: Translate to YAML
-                       context := &bydbql.QueryContext{
-                               DefaultGroup: "default",
-                       }
-                       translator := bydbql.NewTranslator(context)
-                       yamlData, err := translator.TranslateToYAML(parsed)
-                       Expect(err).NotTo(HaveOccurred())
-                       Expect(yamlData).NotTo(BeEmpty())
-
-                       // Step 4: Determine correct endpoint
-                       endpoint, err := determineEndpoint(parsed)
-                       Expect(err).NotTo(HaveOccurred())
-
-                       // Step 5: Validate endpoint matches expected resource 
type
-                       expectedEndpoints := map[bydbql.ResourceType]string{
-                               bydbql.ResourceTypeStream:   streamQueryPath,
-                               bydbql.ResourceTypeMeasure:  measureQueryPath,
-                               bydbql.ResourceTypeTrace:    traceQueryPath,
-                               bydbql.ResourceTypeProperty: propertyQueryPath,
-                       }
-
-                       // For Top-N queries, always use measure endpoint
-                       if _, isTopN := 
parsed.Statement.(*bydbql.TopNStatement); isTopN {
-                               Expect(endpoint).To(Equal(topnQueryPath))
-                       } else if expectedEndpoint, ok := 
expectedEndpoints[resourceType]; ok {
-                               Expect(endpoint).To(Equal(expectedEndpoint))
-                       }
-
-                       // Step 6: Validate YAML structure
-                       var yamlMap map[string]interface{}
-                       err = yaml.Unmarshal(yamlData, &yamlMap)
-                       Expect(err).NotTo(HaveOccurred())
-
-                       // Basic validation: should have name or resource 
identifier
-                       Expect(yamlMap["Name"] != nil || yamlMap["topN"] != 
nil).To(BeTrue())
-
-                       GinkgoWriter.Printf("Successfully processed query: %s", 
query)
-                       GinkgoWriter.Printf("Generated YAML length: %d bytes", 
len(yamlData))
-                       GinkgoWriter.Printf("Target endpoint: %s", endpoint)
-               },
-               Entry("stream query full pipeline",
-                       "SELECT trace_id, service_id FROM STREAM sw WHERE 
service_id = 'webapp' TIME > '-30m' LIMIT 100",
-                       bydbql.ResourceTypeStream),
-               Entry("measure query full pipeline",
-                       "SELECT region, AVG(latency) FROM MEASURE 
service_metrics WHERE service = 'auth' GROUP BY region",
-                       bydbql.ResourceTypeMeasure),
-               Entry("topn query full pipeline",
-                       "SHOW TOP 5 FROM MEASURE error_count WHERE status_code 
= '500'",
-                       bydbql.ResourceTypeMeasure),
-               Entry("trace query full pipeline",
-                       "SELECT * FROM TRACE app_traces WHERE operation_name = 
'GET /api/users' TIME BETWEEN '-1h' AND 'now'",
-                       bydbql.ResourceTypeTrace),
-               Entry("property query full pipeline",
-                       "SELECT ip, region FROM PROPERTY server_info WHERE ID = 
'server-1'",
-                       bydbql.ResourceTypeProperty),
-       )
-})
-
-var _ = Describe("Command Line Flags", func() {
-       var addr string
-       var deferFunc func()
-       var rootCmd *cobra.Command
-
-       BeforeEach(func() {
-               _, addr, deferFunc = setup.EmptyStandalone()
-               addr = httpSchema + addr
-
-               // Reset viper for clean test state
-               viper.Reset()
-               viper.Set("addr", addr)
-               viper.Set("group", "default")
-
-               // Create root command with all subcommands
-               rootCmd = &cobra.Command{Use: "root"}
-               RootCmdFlags(rootCmd)
-       })
-
-       AfterEach(func() {
-               deferFunc()
-       })
-
-       DescribeTable("handles command line flags correctly",
-               func(args []string, expectError bool) {
-                       // Prepend "bydbql" to the args since we're testing the 
bydbql subcommand
-                       fullArgs := append([]string{"bydbql"}, args...)
-                       rootCmd.SetArgs(fullArgs)
-
-                       // We're testing command structure, not execution
-                       // So we expect parsing to succeed even if execution 
might fail
-                       err := rootCmd.ParseFlags(fullArgs)
-
-                       if expectError {
-                               Expect(err).To(HaveOccurred())
-                       } else {
-                               Expect(err).NotTo(HaveOccurred())
-                       }
-               },
-               Entry("valid query command with file", []string{"query", "-f", 
"test.sql"}, false),
-               Entry("valid interactive command", []string{"interactive"}, 
false),
-               Entry("alias i for interactive", []string{"i"}, false),
-       )
-})
-
-var _ = Describe("BydbQL Query Execution", func() {
-       var addr string
-       var deferFunc func()
-       var rootCmd *cobra.Command
-
-       BeforeEach(func() {
-               _, addr, deferFunc = setup.EmptyStandalone()
-               addr = httpSchema + addr
-
-               // Reset viper for clean test state
-               viper.Reset()
-               viper.Set("addr", addr)
-               viper.Set("group", "default")
-
-               // Create root command with all subcommands
-               rootCmd = &cobra.Command{Use: "root"}
-               RootCmdFlags(rootCmd)
-       })
-
-       AfterEach(func() {
-               deferFunc()
-       })
-
-       It("executes simple query successfully", func() {
-               // Test a simple query execution
-               rootCmd.SetArgs([]string{"bydbql", "query", "-f", "-"})
-               rootCmd.SetIn(strings.NewReader("SELECT * FROM STREAM sw LIMIT 
1"))
-
-               executeQuery := func() string {
-                       return capturer.CaptureStdout(func() {
-                               err := rootCmd.Execute()
-                               if err != nil {
-                                       GinkgoWriter.Printf("execution fails: 
%v", err)
-                               }
-                       })
-               }
-
-               Eventually(executeQuery, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("error"))
-       })
-})
diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go
index 3608128f..10af7be7 100644
--- a/bydbctl/internal/cmd/rest.go
+++ b/bydbctl/internal/cmd/rest.go
@@ -203,8 +203,8 @@ func parseTimeRangeFromFlagAndYAML(reader io.Reader) 
(requests []reqBody, err er
                }
                startTS = endTS.Add(-timeRange)
        }
-       s := startTS.Format(time.RFC3339)
-       e := endTS.Format(time.RFC3339)
+       s := startTS.Format("2006-01-02T15:04:05-07:00")
+       e := endTS.Format("2006-01-02T15:04:05-07:00")
        var rawRequests []reqBody
        if rawRequests, err = parseNameAndGroupFromYAML(reader); err != nil {
                return nil, err
diff --git a/bydbctl/internal/cmd/stream_test.go 
b/bydbctl/internal/cmd/stream_test.go
index 4423e134..687e396b 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -237,6 +237,7 @@ projection:
                Eventually(issue, 
flags.EventuallyTimeout).ShouldNot(ContainSubstring("code:"))
                Eventually(func() int {
                        out := issue()
+                       GinkgoWriter.Println(out)
                        resp := new(streamv1.QueryResponse)
                        helpers.UnmarshalYAML([]byte(out), resp)
                        GinkgoWriter.Println(resp)
diff --git a/pkg/bydbql/bydbql_test.go b/pkg/bydbql/bydbql_test.go
index 57e8d8f8..d993b085 100644
--- a/pkg/bydbql/bydbql_test.go
+++ b/pkg/bydbql/bydbql_test.go
@@ -202,31 +202,31 @@ var _ = Describe("Translator", func() {
                Entry("simple stream query",
                        "SELECT * FROM STREAM sw",
                        func(data map[string]any) bool {
-                               return data["Name"] == "sw"
+                               return data["name"] == "sw"
                        }),
                Entry("measure query with aggregation",
                        "SELECT region, SUM(latency) FROM MEASURE metrics GROUP 
BY region",
                        func(data map[string]any) bool {
-                               agg, ok := data["Agg"].(map[string]any)
-                               return ok && agg["Function"] == "SUM" && 
agg["FieldName"] == "latency"
+                               agg, ok := data["agg"].(map[string]any)
+                               return ok && agg["function"] == "SUM" && 
agg["field_name"] == "latency"
                        }),
                Entry("time range query",
                        "SELECT * FROM STREAM sw TIME BETWEEN 
'2023-01-01T00:00:00Z' AND '2023-01-02T00:00:00Z'",
                        func(data map[string]any) bool {
-                               timeRange, ok := 
data["TimeRange"].(map[string]any)
-                               return ok && timeRange["Begin"] == 
"2023-01-01T00:00:00Z" &&
-                                       timeRange["End"] == 
"2023-01-02T00:00:00Z"
+                               timeRange, ok := 
data["timeRange"].(map[string]any)
+                               return ok && timeRange["begin"] == 
"2023-01-01T00:00:00Z" &&
+                                       timeRange["end"] == 
"2023-01-02T00:00:00Z"
                        }),
                Entry("relative time query",
                        "SELECT * FROM STREAM sw TIME > '-30m'",
                        func(data map[string]any) bool {
-                               timeRange, ok := 
data["TimeRange"].(map[string]any)
-                               return ok && timeRange["Begin"] != nil && 
timeRange["End"] != nil
+                               timeRange, ok := 
data["timeRange"].(map[string]any)
+                               return ok && timeRange["begin"] != nil && 
timeRange["end"] != nil
                        }),
                Entry("WHERE clause with criteria",
                        "SELECT * FROM STREAM sw WHERE service_id = 'webapp'",
                        func(data map[string]any) bool {
-                               criteria, ok := data["Criteria"].([]any)
+                               criteria, ok := data["criteria"].([]any)
                                if !ok || len(criteria) == 0 {
                                        return false
                                }
@@ -236,20 +236,49 @@ var _ = Describe("Translator", func() {
                Entry("TOP N query",
                        "SHOW TOP 10 FROM MEASURE service_latency ORDER BY 
value DESC",
                        func(data map[string]any) bool {
-                               topN := data["TopN"]
-                               fieldValueSort := data["FieldValueSort"]
-                               return topN == float64(10) && fieldValueSort == 
"DESC"
+                               topN := data["top_n"]
+                               fieldValueSort := data["field_value_sort"]
+                               return topN == 10 && fieldValueSort == "DESC"
                        }),
                Entry("property query with IDs",
                        "SELECT * FROM PROPERTY metadata WHERE ID = 'id1' OR ID 
= 'id2'",
                        func(data map[string]any) bool {
-                               criteria, ok := data["Criteria"].([]any)
+                               criteria, ok := data["criteria"].([]any)
                                return ok && len(criteria) == 2
                        }),
                Entry("query trace enabled",
                        "SELECT * FROM STREAM sw WITH QUERY_TRACE",
                        func(data map[string]any) bool {
-                               return data["Trace"] == true
+                               return data["trace"] == true
+                       }),
+               Entry("stream query with projection tagFamilies format",
+                       "SELECT trace_id FROM STREAM sw",
+                       func(data map[string]any) bool {
+                               // Verify that projection is translated to 
tagFamilies format with snake_case
+                               projection, ok := 
data["projection"].(map[string]any)
+                               if !ok {
+                                       return false
+                               }
+                               tagFamiliesRaw := projection["tagFamilies"]
+                               tagFamilies, ok := 
tagFamiliesRaw.([]interface{})
+                               if !ok || len(tagFamilies) == 0 {
+                                       return false
+                               }
+                               tagFamily, ok := 
tagFamilies[0].(map[string]interface{})
+                               if !ok {
+                                       return false
+                               }
+                               name, ok := tagFamily["name"].(string)
+                               if !ok || name != "searchable" {
+                                       return false
+                               }
+                               tagsRaw := tagFamily["tags"]
+                               tags, ok := tagsRaw.([]interface{})
+                               if !ok || len(tags) == 0 {
+                                       return false
+                               }
+                               tag, ok := tags[0].(string)
+                               return ok && tag == "trace_id"
                        }),
        )
 })
@@ -370,15 +399,15 @@ var _ = Describe("Time Format Parsing", func() {
                Entry("absolute time range",
                        "BETWEEN '2023-01-01T10:00:00Z' AND 
'2023-01-01T11:00:00Z'",
                        func(data map[string]any) bool {
-                               timeRange, ok := 
data["TimeRange"].(map[string]any)
-                               return ok && timeRange["Begin"] == 
"2023-01-01T10:00:00Z" &&
-                                       timeRange["End"] == 
"2023-01-01T11:00:00Z"
+                               timeRange, ok := 
data["timeRange"].(map[string]any)
+                               return ok && timeRange["begin"] == 
"2023-01-01T10:00:00Z" &&
+                                       timeRange["end"] == 
"2023-01-01T11:00:00Z"
                        }),
                Entry("relative time condition",
                        "> '-30m'",
                        func(data map[string]any) bool {
-                               timeRange, ok := 
data["TimeRange"].(map[string]any)
-                               return ok && timeRange["Begin"] != nil && 
timeRange["End"] != nil
+                               timeRange, ok := 
data["timeRange"].(map[string]any)
+                               return ok && timeRange["begin"] != nil && 
timeRange["end"] != nil
                        }),
        )
 })
diff --git a/pkg/bydbql/translator.go b/pkg/bydbql/translator.go
index ca41b8fd..df25f9eb 100644
--- a/pkg/bydbql/translator.go
+++ b/pkg/bydbql/translator.go
@@ -24,38 +24,38 @@ import (
        "time"
 
        str2duration "github.com/xhit/go-str2duration/v2"
-       "sigs.k8s.io/yaml"
+       "gopkg.in/yaml.v3"
 )
 
 // QueryYAML represents the YAML structure for BanyanDB queries
 type QueryYAML struct {
        // Common fields
-       Name       string                 `yaml:"name,omitempty"`
-       Groups     []string               `yaml:"groups,omitempty"`
-       TimeRange  *TimeRangeYAML         `yaml:"timeRange,omitempty"`
-       Criteria   []map[string]interface{} `yaml:"criteria,omitempty"`
-       Limit      *int                   `yaml:"limit,omitempty"`
-       Offset     *int                   `yaml:"offset,omitempty"`
-       Trace      bool                   `yaml:"trace,omitempty"`
+       Name      string                   `yaml:"name,omitempty" 
json:"name,omitempty"`
+       Groups    []string                 `yaml:"groups,omitempty" 
json:"groups,omitempty"`
+       TimeRange *TimeRangeYAML           `yaml:"timeRange,omitempty" 
json:"timeRange,omitempty"`
+       Criteria  []map[string]interface{} `yaml:"criteria,omitempty" 
json:"criteria,omitempty"`
+       Limit     *int                     `yaml:"limit,omitempty" 
json:"limit,omitempty"`
+       Offset    *int                     `yaml:"offset,omitempty" 
json:"offset,omitempty"`
+       Trace     bool                     `yaml:"trace,omitempty" 
json:"trace,omitempty"`
 
        // Stream/Trace specific
-       Projection []string               `yaml:"projection,omitempty"`
-       OrderBy    *OrderByYAML           `yaml:"orderBy,omitempty"`
+       Projection interface{}  `yaml:"projection,omitempty" 
json:"projection,omitempty"`
+       OrderBy    *OrderByYAML `yaml:"order_by,omitempty" 
json:"order_by,omitempty"`
 
        // Measure specific
-       TagProjection   []string              `yaml:"tagProjection,omitempty"`
-       FieldProjection []string              `yaml:"fieldProjection,omitempty"`
-       GroupBy         *GroupByYAML          `yaml:"groupBy,omitempty"`
-       Agg             *AggregationYAML      `yaml:"agg,omitempty"`
-       Top             *TopYAML              `yaml:"top,omitempty"`
+       TagProjection   []string         `yaml:"tag_projection,omitempty" 
json:"tag_projection,omitempty"`
+       FieldProjection []string         `yaml:"field_projection,omitempty" 
json:"field_projection,omitempty"`
+       GroupBy         *GroupByYAML     `yaml:"group_by,omitempty" 
json:"group_by,omitempty"`
+       Agg             *AggregationYAML `yaml:"agg,omitempty" 
json:"agg,omitempty"`
+       Top             *TopYAML         `yaml:"top,omitempty" 
json:"top,omitempty"`
 
        // Property specific
-       IDs []string `yaml:"ids,omitempty"`
+       IDs []string `yaml:"ids,omitempty" json:"ids,omitempty"`
 
        // Top-N specific (for separate endpoint)
-       TopN              int                `yaml:"topN,omitempty"`
-       FieldValueSort    string             `yaml:"fieldValueSort,omitempty"`
-       Conditions        []map[string]interface{} `yaml:"conditions,omitempty"`
+       TopN           int                      `yaml:"top_n,omitempty" 
json:"top_n,omitempty"`
+       FieldValueSort string                   
`yaml:"field_value_sort,omitempty" json:"field_value_sort,omitempty"`
+       Conditions     []map[string]interface{} `yaml:"conditions,omitempty" 
json:"conditions,omitempty"`
 }
 
 // TimeRangeYAML represents time range in YAML
@@ -66,26 +66,37 @@ type TimeRangeYAML struct {
 
 // OrderByYAML represents ORDER BY clause in YAML
 type OrderByYAML struct {
-       IndexRuleName string `yaml:"indexRuleName"`
+       IndexRuleName string `yaml:"index_rule_name"`
        Sort          string `yaml:"sort"` // ASC or DESC
 }
 
 // GroupByYAML represents GROUP BY clause in YAML
 type GroupByYAML struct {
-       TagProjection []string `yaml:"tagProjection"`
+       TagProjection []string `yaml:"tag_projection"`
+}
+
+// TagFamilyYAML represents a tag family in projection
+type TagFamilyYAML struct {
+       Name string   `yaml:"name"`
+       Tags []string `yaml:"tags"`
+}
+
+// ProjectionYAML represents the projection structure for stream/trace queries
+type ProjectionYAML struct {
+       TagFamilies []TagFamilyYAML `yaml:"tagFamilies"`
 }
 
 // AggregationYAML represents aggregation function in YAML
 type AggregationYAML struct {
-       Function   string `yaml:"function"`
-       FieldName  string `yaml:"fieldName"`
+       Function  string `yaml:"function"`
+       FieldName string `yaml:"field_name"`
 }
 
 // TopYAML represents TOP N clause in YAML
 type TopYAML struct {
-       Number        int      `yaml:"number"`
-       FieldName     string   `yaml:"fieldName"`
-       FieldValueSort string  `yaml:"fieldValueSort"`
+       Number         int    `yaml:"number"`
+       FieldName      string `yaml:"field_name"`
+       FieldValueSort string `yaml:"field_value_sort"`
 }
 
 // Translator converts parsed BydbQL to YAML format
@@ -209,10 +220,20 @@ func (t *Translator) translateStreamOrTraceQuery(stmt 
*SelectStatement, yamlQuer
                        // SELECT * - no specific projection needed
                } else if stmt.Projection.Empty {
                        // SELECT () - empty projection for traces
-                       yamlQuery.Projection = []string{}
+                       yamlQuery.Projection = &ProjectionYAML{
+                               TagFamilies: []TagFamilyYAML{},
+                       }
                } else if len(stmt.Projection.Columns) > 0 {
+                       // Group columns by tag family (default to "searchable" 
for now)
+                       tagFamily := TagFamilyYAML{
+                               Name: "searchable",
+                               Tags: make([]string, 0, 
len(stmt.Projection.Columns)),
+                       }
                        for _, col := range stmt.Projection.Columns {
-                               yamlQuery.Projection = 
append(yamlQuery.Projection, col.Name)
+                               tagFamily.Tags = append(tagFamily.Tags, 
col.Name)
+                       }
+                       yamlQuery.Projection = &ProjectionYAML{
+                               TagFamilies: []TagFamilyYAML{tagFamily},
                        }
                }
        }
@@ -301,9 +322,11 @@ func (t *Translator) translateMeasureQuery(stmt 
*SelectStatement, yamlQuery *Que
 func (t *Translator) translatePropertyQuery(stmt *SelectStatement, yamlQuery 
*QueryYAML) (*QueryYAML, error) {
        // Translate projection
        if stmt.Projection != nil && !stmt.Projection.All && 
len(stmt.Projection.Columns) > 0 {
+               projection := make([]string, 0, len(stmt.Projection.Columns))
                for _, col := range stmt.Projection.Columns {
-                       yamlQuery.Projection = append(yamlQuery.Projection, 
col.Name)
+                       projection = append(projection, col.Name)
                }
+               yamlQuery.Projection = projection
        }
 
        // Extract IDs from WHERE clause if present
@@ -588,4 +611,4 @@ func TranslateQuery(query string, context *QueryContext) 
([]byte, []string, erro
        }
 
        return yamlBytes, nil, nil
-}
\ No newline at end of file
+}

Reply via email to