This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ebdbb62  add measure and group cmd (#180)
ebdbb62 is described below

commit ebdbb62c066223a9e9b106da0c60ea4e50f559b2
Author: sacloud <[email protected]>
AuthorDate: Sun Oct 9 22:36:36 2022 +0800

    add measure and group cmd (#180)
    
    * add measure and group cmd
    
    * Add metadata verification
    
    * Implement grpc health check client
    
    * Handle grpc error
    
    * add start and end flag
    
    Signed-off-by: Gao Hongtao <[email protected]>
    Co-authored-by: Gao Hongtao <[email protected]>
---
 api/common/id.go                                   |  13 ++
 banyand/liaison/grpc/measure.go                    |  13 +-
 banyand/liaison/grpc/stream.go                     |  13 +-
 banyand/liaison/http/health.go                     |  42 +++-
 banyand/liaison/http/server.go                     |  37 ++--
 banyand/query/processor.go                         |  33 ++-
 bydbctl/internal/cmd/group.go                      |  66 +++++-
 bydbctl/internal/cmd/group_test.go                 | 150 ++++++++++++++
 bydbctl/internal/cmd/{stream.go => measure.go}     |  74 +++----
 bydbctl/internal/cmd/measure_test.go               | 220 ++++++++++++++++++++
 bydbctl/internal/cmd/rest.go                       |  98 ++++++++-
 bydbctl/internal/cmd/root.go                       |  11 +-
 bydbctl/internal/cmd/stream.go                     |  11 +-
 bydbctl/internal/cmd/stream_test.go                | 230 ++++++++++++---------
 dist/LICENSE                                       |   2 +-
 ...license-github.com-xhit-go-str2duration-v2.txt} |  27 +--
 go.mod                                             |   2 +-
 go.sum                                             |   3 +-
 pkg/test/helpers/http_health.go                    |   3 +
 pkg/test/setup/setup.go                            |  10 +-
 test/cases/stream/{stream.go => data/data.go}      |  87 +++-----
 test/cases/stream/{ => data}/input/all.yaml        |   0
 test/cases/stream/{ => data}/input/filter_tag.yaml |   0
 .../stream/{ => data}/input/filter_tag_empty.yaml  |   0
 .../stream/{ => data}/input/global_index.yaml      |   0
 test/cases/stream/{ => data}/input/having.yaml     |   0
 .../stream/{ => data}/input/indexed_only.yaml      |   0
 test/cases/stream/{ => data}/input/less.yaml       |   0
 test/cases/stream/{ => data}/input/less_eq.yaml    |   0
 test/cases/stream/{ => data}/input/limit.yaml      |   0
 test/cases/stream/{ => data}/input/logical.yaml    |   0
 test/cases/stream/{ => data}/input/offset.yaml     |   0
 test/cases/stream/{ => data}/input/search.yaml     |   0
 test/cases/stream/{ => data}/input/sort_desc.yaml  |   0
 test/cases/stream/{ => data}/testdata/data.json    |   0
 test/cases/stream/{ => data}/want/all.yaml         |   0
 test/cases/stream/{ => data}/want/filter_tag.yaml  |   0
 .../cases/stream/{ => data}/want/global_index.yaml |   0
 test/cases/stream/{ => data}/want/having.yaml      |   0
 .../cases/stream/{ => data}/want/indexed_only.yaml |   0
 test/cases/stream/{ => data}/want/less.yaml        |   0
 test/cases/stream/{ => data}/want/less_eq.yaml     |   0
 test/cases/stream/{ => data}/want/limit.yaml       |   0
 test/cases/stream/{ => data}/want/logical.yaml     |   0
 test/cases/stream/{ => data}/want/offset.yaml      |   0
 test/cases/stream/{ => data}/want/search.yaml      |   0
 test/cases/stream/{ => data}/want/sort_desc.yaml   |   0
 test/cases/stream/stream.go                        | 134 +-----------
 test/integration/cold_query/query_suite_test.go    |  45 ++--
 test/integration/other/measure_test.go             |   2 +-
 test/integration/other/property_test.go            |   4 +-
 test/integration/other/tls_test.go                 |   2 +-
 test/integration/query/query_suite_test.go         |  29 +--
 53 files changed, 900 insertions(+), 461 deletions(-)

diff --git a/api/common/id.go b/api/common/id.go
index 7443083..e106b54 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -19,6 +19,7 @@ package common
 
 import (
        "context"
+       "fmt"
 
        "github.com/prometheus/client_golang/prometheus"
 
@@ -69,3 +70,15 @@ func SetPosition(ctx context.Context, fn func(p Position) 
Position) context.Cont
        }
        return context.WithValue(ctx, PositionKey, fn(p))
 }
+
+type Error struct {
+       msg string
+}
+
+func NewError(tpl string, args ...any) Error {
+       return Error{msg: fmt.Sprintf(tpl, args...)}
+}
+
+func (e Error) Msg() string {
+       return e.msg
+}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index c77a4b5..5cc53c9 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -22,9 +22,11 @@ import (
        "io"
        "time"
 
+       "github.com/pkg/errors"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -99,11 +101,14 @@ func (ms *measureService) Query(_ context.Context, 
entityCriteria *measurev1.Que
        if errFeat != nil {
                return nil, errFeat
        }
-       queryMsg, ok := msg.Data().([]*measurev1.DataPoint)
-       if !ok {
-               return nil, ErrQueryMsg
+       data := msg.Data()
+       switch d := data.(type) {
+       case []*measurev1.DataPoint:
+               return &measurev1.QueryResponse{DataPoints: d}, nil
+       case common.Error:
+               return nil, errors.WithMessage(ErrQueryMsg, d.Msg())
        }
-       return &measurev1.QueryResponse{DataPoints: queryMsg}, nil
+       return nil, ErrQueryMsg
 }
 
 // TODO: implement topN
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index cc61a8f..8cf6e1b 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -22,9 +22,11 @@ import (
        "io"
        "time"
 
+       "github.com/pkg/errors"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -103,9 +105,12 @@ func (s *streamService) Query(_ context.Context, 
entityCriteria *streamv1.QueryR
        if errFeat != nil {
                return nil, errFeat
        }
-       queryMsg, ok := msg.Data().([]*streamv1.Element)
-       if !ok {
-               return nil, ErrQueryMsg
+       data := msg.Data()
+       switch d := data.(type) {
+       case []*streamv1.Element:
+               return &streamv1.QueryResponse{Elements: d}, nil
+       case common.Error:
+               return nil, errors.WithMessage(ErrQueryMsg, d.Msg())
        }
-       return &streamv1.QueryResponse{Elements: queryMsg}, nil
+       return nil, ErrQueryMsg
 }
diff --git a/banyand/liaison/http/health.go b/banyand/liaison/http/health.go
index 8adf613..3912c7b 100644
--- a/banyand/liaison/http/health.go
+++ b/banyand/liaison/http/health.go
@@ -19,17 +19,55 @@ package http
 
 import (
        "context"
+       "time"
 
        "google.golang.org/grpc"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/health/grpc_health_v1"
        "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-type healthCheckClient struct{}
+func newHealthCheckClient(ctx context.Context, l *logger.Logger, addr string, 
opts []grpc.DialOption) (client *healthCheckClient, err error) {
+       conn, err := grpc.Dial(addr, opts...)
+       if err != nil {
+               return nil, err
+       }
+       defer func() {
+               if err != nil {
+                       if cerr := conn.Close(); cerr != nil {
+                               l.Info().Str("addr", 
addr).Err(cerr).Msg("Failed to close conn")
+                       }
+                       return
+               }
+               go func() {
+                       <-ctx.Done()
+                       if cerr := conn.Close(); cerr != nil {
+                               l.Info().Str("addr", 
addr).Err(cerr).Msg("Failed to close conn")
+                       }
+               }()
+       }()
+       return &healthCheckClient{conn: conn}, nil
+}
+
+type healthCheckClient struct {
+       conn *grpc.ClientConn
+}
 
 func (g *healthCheckClient) Check(ctx context.Context, r 
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) 
(*grpc_health_v1.HealthCheckResponse, error) {
-       return &grpc_health_v1.HealthCheckResponse{Status: 
grpc_health_v1.HealthCheckResponse_SERVING}, nil
+       var resp *grpc_health_v1.HealthCheckResponse
+       if err := grpchelper.Request(context.Background(), 10*time.Second, 
func(rpcCtx context.Context) (err error) {
+               resp, err = grpc_health_v1.NewHealthClient(g.conn).Check(rpcCtx,
+                       &grpc_health_v1.HealthCheckRequest{
+                               Service: "",
+                       })
+               return err
+       }); err != nil {
+               return nil, err
+       }
+       return resp, nil
 }
 
 func (g *healthCheckClient) Watch(ctx context.Context, r 
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) 
(grpc_health_v1.Health_WatchClient, error) {
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index b078211..7800296 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -67,7 +67,7 @@ type service struct {
 func (p *service) FlagSet() *run.FlagSet {
        flagSet := run.NewFlagSet("")
        flagSet.StringVar(&p.listenAddr, "http-addr", ":17913", "listen addr 
for http")
-       flagSet.StringVar(&p.grpcAddr, "grcp-addr", "localhost:17912", "the 
grpc addr")
+       flagSet.StringVar(&p.grpcAddr, "grpc-addr", "localhost:17912", "the 
grpc addr")
        return flagSet
 }
 
@@ -92,27 +92,26 @@ func (p *service) PreRun() error {
        serveIndex := serveFileContents("index.html", httpFS)
        p.mux.Mount("/", intercept404(fileServer, serveIndex))
 
-       gwMux := 
runtime.NewServeMux(runtime.WithHealthzEndpoint(&healthCheckClient{}))
        var ctx context.Context
        ctx, p.clientCloser = context.WithCancel(context.Background())
-
+       opts := []grpc.DialOption{
+               // TODO: add TLS
+               grpc.WithTransportCredentials(insecure.NewCredentials()),
+       }
+       client, err := newHealthCheckClient(ctx, p.l, p.grpcAddr, opts)
+       if err != nil {
+               return err
+       }
+       gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
        err = multierr.Combine(
-               
database_v1.RegisterStreamRegistryServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr,
-                       
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-               
database_v1.RegisterMeasureRegistryServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr,
-                       
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-               
database_v1.RegisterIndexRuleRegistryServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr,
-                       
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-               
database_v1.RegisterIndexRuleBindingRegistryServiceHandlerFromEndpoint(ctx, 
gwMux, p.grpcAddr,
-                       
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-               
database_v1.RegisterGroupRegistryServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr,
-                       
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-               stream_v1.RegisterStreamServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr,
-                       
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-               measure_v1.RegisterMeasureServiceHandlerFromEndpoint(ctx, 
gwMux, p.grpcAddr,
-                       
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-               property_v1.RegisterPropertyServiceHandlerFromEndpoint(ctx, 
gwMux, p.grpcAddr,
-                       
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
+               
database_v1.RegisterStreamRegistryServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr, opts),
+               
database_v1.RegisterMeasureRegistryServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr, opts),
+               
database_v1.RegisterIndexRuleRegistryServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr, opts),
+               
database_v1.RegisterIndexRuleBindingRegistryServiceHandlerFromEndpoint(ctx, 
gwMux, p.grpcAddr, opts),
+               
database_v1.RegisterGroupRegistryServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr, opts),
+               stream_v1.RegisterStreamServiceHandlerFromEndpoint(ctx, gwMux, 
p.grpcAddr, opts),
+               measure_v1.RegisterMeasureServiceHandlerFromEndpoint(ctx, 
gwMux, p.grpcAddr, opts),
+               property_v1.RegisterPropertyServiceHandlerFromEndpoint(ctx, 
gwMux, p.grpcAddr, opts),
        )
        if err != nil {
                return err
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index c5ba8ff..64d6146 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -23,6 +23,7 @@ import (
 
        "go.uber.org/multierr"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
@@ -63,9 +64,10 @@ type streamQueryProcessor struct {
 }
 
 func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
+       now := time.Now().UnixNano()
        queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
        if !ok {
-               p.log.Warn().Msg("invalid event data type")
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
                return
        }
        p.log.Debug().Stringer("criteria", queryCriteria).Msg("received a query 
request")
@@ -73,27 +75,25 @@ func (p *streamQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        meta := queryCriteria.GetMetadata()
        ec, err := p.streamService.Stream(meta)
        if err != nil {
-               p.log.Error().Err(err).
-                       Str("stream", meta.GetName()).
-                       Msg("fail to get execution context for stream")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to get execution context for stream %s: %v", meta.GetName(), err))
                return
        }
 
        analyzer, err := 
logical_stream.CreateAnalyzerFromMetaService(p.metaService)
        if err != nil {
-               p.log.Error().Err(err).Msg("fail to build analyzer")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build analyzer for stream %s: %v", meta.GetName(), err))
                return
        }
 
        s, err := analyzer.BuildSchema(context.TODO(), meta)
        if err != nil {
-               p.log.Error().Err(err).Msg("fail to build")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build schema for stream %s: %v", meta.GetName(), err))
                return
        }
 
        plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s)
        if err != nil {
-               p.log.Error().Err(err).Msg("fail to analyze the query request")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for stream %s: %v", meta.GetName(), err))
                return
        }
 
@@ -101,11 +101,10 @@ func (p *streamQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
 
        entities, err := plan.(executor.StreamExecutable).Execute(ec)
        if err != nil {
-               p.log.Error().Err(err).Msg("fail to execute the query plan")
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("execute the query plan for stream %s: %v", meta.GetName(), 
err))
                return
        }
 
-       now := time.Now().UnixNano()
        resp = bus.NewMessage(bus.MessageID(now), entities)
 
        return
@@ -118,8 +117,9 @@ type measureQueryProcessor struct {
 
 func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
        queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
+       now := time.Now().UnixNano()
        if !ok {
-               p.queryService.log.Warn().Msg("invalid event data type")
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
                return
        }
        p.log.Debug().Msg("received a query event")
@@ -127,27 +127,25 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        meta := queryCriteria.GetMetadata()
        ec, err := p.measureService.Measure(meta)
        if err != nil {
-               p.log.Error().Err(err).
-                       Str("measure", meta.GetName()).
-                       Msg("fail to get execution context")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to get execution context for measure %s: %v", meta.GetName(), err))
                return
        }
 
        analyzer, err := 
logical_measure.CreateAnalyzerFromMetaService(p.metaService)
        if err != nil {
-               p.log.Error().Err(err).Msg("fail to build analyzer")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build analyzer for measure %s: %v", meta.GetName(), err))
                return
        }
 
        s, err := analyzer.BuildSchema(context.TODO(), meta)
        if err != nil {
-               p.queryService.log.Error().Err(err).Msg("fail to build measure 
schema")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to build schema for measure %s: %v", meta.GetName(), err))
                return
        }
 
        plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s)
        if err != nil {
-               p.queryService.log.Error().Err(err).Msg("fail to analyze the 
query request")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for measure %s: %v", meta.GetName(), err))
                return
        }
 
@@ -155,7 +153,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
 
        mIterator, err := plan.(executor.MeasureExecutable).Execute(ec)
        if err != nil {
-               p.queryService.log.Error().Err(err).Msg("fail to execute the 
query plan")
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to execute the query plan for measure %s: %v", meta.GetName(), err))
                return
        }
        defer func() {
@@ -170,7 +168,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
                        result = append(result, current[0])
                }
        }
-       now := time.Now().UnixNano()
        resp = bus.NewMessage(bus.MessageID(now), result)
        return
 }
diff --git a/bydbctl/internal/cmd/group.go b/bydbctl/internal/cmd/group.go
index cf5e5d0..ba4b0e4 100644
--- a/bydbctl/internal/cmd/group.go
+++ b/bydbctl/internal/cmd/group.go
@@ -65,15 +65,68 @@ func newGroupCmd() *cobra.Command {
                                })
                },
        }
-       bindFileFlag(createCmd)
+
+       updateCmd := &cobra.Command{
+               Use:     "update -f [file|dir|-]",
+               Version: version.Build(),
+               Short:   "Update groups from files",
+               RunE: func(cmd *cobra.Command, args []string) (err error) {
+                       return rest(func() ([]reqBody, error) { return 
parseNameFromYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       g := new(common_v1.Group)
+                                       err := 
protojson.Unmarshal(request.data, g)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       cr := 
&database_v1.GroupRegistryServiceUpdateRequest{
+                                               Group: g,
+                                       }
+                                       b, err := json.Marshal(cr)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       return 
request.req.SetBody(b).SetPathParam("group", 
request.name).Put(getPath("/api/v1/group/schema/{group}"))
+                               },
+                               func(_ int, reqBody reqBody, _ []byte) error {
+                                       fmt.Printf("group %s is updated", 
reqBody.name)
+                                       fmt.Println()
+                                       return nil
+                               })
+               },
+       }
+       bindFileFlag(createCmd, updateCmd)
+
+       getCmd := &cobra.Command{
+               Use:     "get [-g group]",
+               Version: version.Build(),
+               Short:   "Get a group",
+               RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
+                               return request.req.SetPathParam("group", 
request.group).Get(getPath("/api/v1/group/schema/{group}"))
+                       }, yamlPrinter)
+               },
+       }
+
+       deleteCmd := &cobra.Command{
+               Use:     "delete [-g group]",
+               Version: version.Build(),
+               Short:   "Delete a group",
+               RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
+                               return request.req.SetPathParam("group", 
request.group).Delete(getPath("/api/v1/group/schema/{group}"))
+                       },
+                               func(_ int, reqBody reqBody, _ []byte) error {
+                                       fmt.Printf("group %s is deleted", 
reqBody.name)
+                                       fmt.Println()
+                                       return nil
+                               })
+               },
+       }
 
        listCmd := &cobra.Command{
                Use:     "list",
                Version: version.Build(),
-               Short:   "list all groups",
-               PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
-                       return cmd.Parent().PersistentPreRunE(cmd.Parent(), 
args)
-               },
+               Short:   "List all groups",
                RunE: func(cmd *cobra.Command, args []string) (err error) {
                        return rest(nil, func(request request) 
(*resty.Response, error) {
                                return 
request.req.Get(getPath("/api/v1/group/schema/lists"))
@@ -81,7 +134,6 @@ func newGroupCmd() *cobra.Command {
                },
        }
 
-       // todo:GroupGetCmd, GroupUpdateCmd, GroupDeleteCmd
-       groupCmd.AddCommand(createCmd, listCmd)
+       groupCmd.AddCommand(createCmd, updateCmd, listCmd, getCmd, deleteCmd)
        return groupCmd
 }
diff --git a/bydbctl/internal/cmd/group_test.go 
b/bydbctl/internal/cmd/group_test.go
new file mode 100644
index 0000000..9362e27
--- /dev/null
+++ b/bydbctl/internal/cmd/group_test.go
@@ -0,0 +1,150 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+       "strings"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/spf13/cobra"
+       "github.com/zenizh/go-capturer"
+
+       database_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+var _ = Describe("Group", func() {
+       var addr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       BeforeEach(func() {
+               _, addr, deferFunc = setup.SetUp()
+               Eventually(helpers.HTTPHealthCheck(addr), 
10*time.Second).Should(Succeed())
+               addr = "http://"; + addr
+               // extracting the operation of creating group
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+               rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", 
"-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: group1
+catalog: CATALOG_STREAM
+resource_opts:
+  shard_num: 2
+  block_interval:
+    unit: UNIT_HOUR
+    num: 2
+  segment_interval:
+    unit: UNIT_DAY
+    num: 1
+  ttl:
+    unit: UNIT_DAY
+    num: 7`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("group group1 is created"))
+       })
+
+       It("get group", func() {
+               rootCmd.SetArgs([]string{"group", "get", "-g", "group1"})
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(database_v1.GroupRegistryServiceGetResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.Group.Metadata.Name).To(Equal("group1"))
+       })
+
+       It("update group", func() {
+               rootCmd.SetArgs([]string{"group", "update", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: group1
+catalog: CATALOG_STREAM
+resource_opts:
+  shard_num: 1
+  block_interval:
+    unit: UNIT_HOUR
+    num: 2
+  segment_interval:
+    unit: UNIT_DAY
+    num: 1
+  ttl:
+    unit: UNIT_DAY
+    num: 7`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("group group1 is updated"))
+       })
+
+       It("delete group", func() {
+               rootCmd.SetArgs([]string{"group", "delete", "-g", "group1"})
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("group group1 is deleted"))
+       })
+
+       It("list group", func() {
+               // create another group for list operation
+               rootCmd.SetArgs([]string{"group", "create", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: group2
+catalog: CATALOG_STREAM
+resource_opts:
+  shard_num: 2
+  block_interval:
+    unit: UNIT_HOUR
+    num: 2
+  segment_interval:
+    unit: UNIT_DAY
+    num: 1
+  ttl:
+    unit: UNIT_DAY
+    num: 7`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("group group2 is created"))
+               // list
+               rootCmd.SetArgs([]string{"group", "list"})
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(database_v1.GroupRegistryServiceListResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.Group).To(HaveLen(4))
+       })
+
+       AfterEach(func() {
+               deferFunc()
+       })
+})
diff --git a/bydbctl/internal/cmd/stream.go b/bydbctl/internal/cmd/measure.go
similarity index 69%
copy from bydbctl/internal/cmd/stream.go
copy to bydbctl/internal/cmd/measure.go
index 719ec32..9a66323 100644
--- a/bydbctl/internal/cmd/stream.go
+++ b/bydbctl/internal/cmd/measure.go
@@ -23,51 +23,46 @@ import (
 
        "github.com/go-resty/resty/v2"
        "github.com/spf13/cobra"
-       "github.com/spf13/viper"
        "google.golang.org/protobuf/encoding/protojson"
 
        database_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-const streamSchemaPath = "/api/v1/stream/schema"
+const measureSchemaPath = "/api/v1/measure/schema"
 
-var streamSchemaPathWithParams = streamSchemaPath + "/{group}/{name}"
+var measureSchemaPathWithParams = measureSchemaPath + "/{group}/{name}"
 
-func getPath(path string) string {
-       return viper.GetString("addr") + path
-}
-
-func newStreamCmd() *cobra.Command {
-       streamCmd := &cobra.Command{
-               Use:     "stream",
+func newMeasureCmd() *cobra.Command {
+       measureCmd := &cobra.Command{
+               Use:     "measure",
                Version: version.Build(),
-               Short:   "Stream operation",
+               Short:   "Measure operation",
        }
 
        createCmd := &cobra.Command{
                Use:     "create -f [file|dir|-]",
                Version: version.Build(),
-               Short:   "Create streams from files",
+               Short:   "Create measures from files",
                RunE: func(cmd *cobra.Command, _ []string) error {
                        return rest(func() ([]reqBody, error) { return 
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
                                func(request request) (*resty.Response, error) {
-                                       s := new(database_v1.Stream)
+                                       s := new(database_v1.Measure)
                                        err := 
protojson.Unmarshal(request.data, s)
                                        if err != nil {
                                                return nil, err
                                        }
-                                       cr := 
&database_v1.StreamRegistryServiceCreateRequest{
-                                               Stream: s,
+                                       cr := 
&database_v1.MeasureRegistryServiceCreateRequest{
+                                               Measure: s,
                                        }
                                        b, err := json.Marshal(cr)
                                        if err != nil {
                                                return nil, err
                                        }
-                                       return 
request.req.SetBody(b).Post(getPath(streamSchemaPath))
+                                       return 
request.req.SetBody(b).Post(getPath(measureSchemaPath))
                                },
                                func(_ int, reqBody reqBody, _ []byte) error {
-                                       fmt.Printf("stream %s.%s is created", 
reqBody.group, reqBody.name)
+                                       fmt.Printf("measure %s.%s is created", 
reqBody.group, reqBody.name)
                                        fmt.Println()
                                        return nil
                                })
@@ -77,17 +72,17 @@ func newStreamCmd() *cobra.Command {
        updateCmd := &cobra.Command{
                Use:     "update -f [file|dir|-]",
                Version: version.Build(),
-               Short:   "Update streams from files",
+               Short:   "Update measures from files",
                RunE: func(cmd *cobra.Command, _ []string) (err error) {
                        return rest(func() ([]reqBody, error) { return 
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
                                func(request request) (*resty.Response, error) {
-                                       s := new(database_v1.Stream)
+                                       s := new(database_v1.Measure)
                                        err := 
protojson.Unmarshal(request.data, s)
                                        if err != nil {
                                                return nil, err
                                        }
-                                       cr := 
&database_v1.StreamRegistryServiceUpdateRequest{
-                                               Stream: s,
+                                       cr := 
&database_v1.MeasureRegistryServiceUpdateRequest{
+                                               Measure: s,
                                        }
                                        b, err := json.Marshal(cr)
                                        if err != nil {
@@ -95,24 +90,23 @@ func newStreamCmd() *cobra.Command {
                                        }
                                        return request.req.SetBody(b).
                                                SetPathParam("name", 
request.name).SetPathParam("group", request.group).
-                                               
Put(getPath(streamSchemaPathWithParams))
+                                               
Put(getPath(measureSchemaPathWithParams))
                                },
                                func(_ int, reqBody reqBody, _ []byte) error {
-                                       fmt.Printf("stream %s.%s is updated", 
reqBody.group, reqBody.name)
+                                       fmt.Printf("measure %s.%s is updated", 
reqBody.group, reqBody.name)
                                        fmt.Println()
                                        return nil
                                })
                },
        }
-       bindFileFlag(createCmd, updateCmd)
 
        getCmd := &cobra.Command{
                Use:     "get [-g group] -n name",
                Version: version.Build(),
-               Short:   "Get a stream",
+               Short:   "Get a measure",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
                        return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
-                               return request.req.SetPathParam("name", 
request.name).SetPathParam("group", 
request.group).Get(getPath(streamSchemaPathWithParams))
+                               return request.req.SetPathParam("name", 
request.name).SetPathParam("group", 
request.group).Get(getPath(measureSchemaPathWithParams))
                        }, yamlPrinter)
                },
        }
@@ -120,12 +114,12 @@ func newStreamCmd() *cobra.Command {
        deleteCmd := &cobra.Command{
                Use:     "delete [-g group] -n name",
                Version: version.Build(),
-               Short:   "Delete a stream",
+               Short:   "Delete a measure",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
                        return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
-                               return request.req.SetPathParam("name", 
request.name).SetPathParam("group", 
request.group).Delete(getPath(streamSchemaPathWithParams))
+                               return request.req.SetPathParam("name", 
request.name).SetPathParam("group", 
request.group).Delete(getPath(measureSchemaPathWithParams))
                        }, func(_ int, reqBody reqBody, _ []byte) error {
-                               fmt.Printf("stream %s.%s is deleted", 
reqBody.group, reqBody.name)
+                               fmt.Printf("measure %s.%s is deleted", 
reqBody.group, reqBody.name)
                                fmt.Println()
                                return nil
                        })
@@ -136,28 +130,28 @@ func newStreamCmd() *cobra.Command {
        listCmd := &cobra.Command{
                Use:     "list [-g group]",
                Version: version.Build(),
-               Short:   "List streams",
+               Short:   "List measures",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
                        return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
-                               return request.req.SetPathParam("group", 
request.group).Get(getPath("/api/v1/stream/schema/lists/{group}"))
+                               return request.req.SetPathParam("group", 
request.group).Get(getPath("/api/v1/measure/schema/lists/{group}"))
                        }, yamlPrinter)
                },
        }
 
        queryCmd := &cobra.Command{
-               Use:     "query",
+               Use:     "query -f [file|dir|-]",
                Version: version.Build(),
-               Short:   "Query data in a stream",
-               PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
-                       return cmd.Parent().PersistentPreRunE(cmd.Parent(), 
args)
-               },
+               Short:   "Query data in a measure",
                RunE: func(cmd *cobra.Command, _ []string) (err error) {
-                       return rest(func() ([]reqBody, error) { return 
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+                       return rest(func() ([]reqBody, error) { return 
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
                                func(request request) (*resty.Response, error) {
-                                       return 
request.req.SetBody(request.data).Post(getPath("/api/v1/stream/data"))
+                                       return 
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/data"))
                                }, yamlPrinter)
                },
        }
-       streamCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
-       return streamCmd
+       bindFileFlag(createCmd, updateCmd, queryCmd)
+       bindTimeRangeFlag(queryCmd)
+
+       measureCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
+       return measureCmd
 }
diff --git a/bydbctl/internal/cmd/measure_test.go 
b/bydbctl/internal/cmd/measure_test.go
new file mode 100644
index 0000000..b7e79c3
--- /dev/null
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -0,0 +1,220 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+       "fmt"
+       "strings"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/spf13/cobra"
+       "github.com/zenizh/go-capturer"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       database_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measure_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       cases_measure_data 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+)
+
+var _ = Describe("Measure Schema Operation", func() {
+       var addr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       BeforeEach(func() {
+               _, addr, deferFunc = setup.SetUp()
+               Eventually(helpers.HTTPHealthCheck(addr), 
10*time.Second).Should(Succeed())
+               addr = "http://"; + addr
+               // extracting the operation of creating measure schema
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+               rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", 
"-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: group1
+catalog: CATALOG_MEASURE
+resource_opts:
+  shard_num: 2
+  block_interval:
+    unit: UNIT_HOUR
+    num: 2
+  segment_interval:
+    unit: UNIT_DAY
+    num: 1
+  ttl:
+    unit: UNIT_DAY
+    num: 7`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("group group1 is created"))
+               rootCmd.SetArgs([]string{"measure", "create", "-a", addr, "-f", 
"-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name1
+  group: group1`))
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("measure group1.name1 is 
created"))
+       })
+
+       It("get measure schema", func() {
+               rootCmd.SetArgs([]string{"measure", "get", "-g", "group1", 
"-n", "name1"})
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(database_v1.MeasureRegistryServiceGetResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.Measure.Metadata.Group).To(Equal("group1"))
+               Expect(resp.Measure.Metadata.Name).To(Equal("name1"))
+       })
+
+       It("update measure schema", func() {
+               rootCmd.SetArgs([]string{"measure", "update", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name1
+  group: group1
+entity:
+  tagNames: ["tag1"]`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("measure group1.name1 is 
updated"))
+               rootCmd.SetArgs([]string{"measure", "get", "-g", "group1", 
"-n", "name1"})
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(database_v1.MeasureRegistryServiceGetResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.Measure.Metadata.Group).To(Equal("group1"))
+               Expect(resp.Measure.Metadata.Name).To(Equal("name1"))
+               Expect(resp.Measure.Entity.TagNames[0]).To(Equal("tag1"))
+       })
+
+       It("delete measure schema", func() {
+               // delete
+               rootCmd.SetArgs([]string{"measure", "delete", "-g", "group1", 
"-n", "name1"})
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("measure group1.name1 is 
deleted"))
+               // get again
+               rootCmd.SetArgs([]string{"measure", "get", "-g", "group1", 
"-n", "name1"})
+               err := rootCmd.Execute()
+               Expect(err).To(MatchError("rpc error: code = NotFound desc = 
banyandb: resource not found"))
+       })
+
+       It("list measure schema", func() {
+               // create another measure schema for list operation
+               rootCmd.SetArgs([]string{"measure", "create", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name2
+  group: group1`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("measure group1.name2 is 
created"))
+               // list
+               rootCmd.SetArgs([]string{"measure", "list", "-g", "group1"})
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(database_v1.MeasureRegistryServiceListResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.Measure).To(HaveLen(2))
+       })
+
+       AfterEach(func() {
+               deferFunc()
+       })
+})
+
+var _ = Describe("Measure Data Query", func() {
+       var addr, grpcAddr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       BeforeEach(func() {
+               grpcAddr, addr, deferFunc = setup.SetUp()
+               Eventually(helpers.HTTPHealthCheck(addr), 
10*time.Second).Should(Succeed())
+               addr = "http://"; + addr
+               time.Sleep(1 * time.Second)
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+       })
+
+       It("query measure data", func() {
+               conn, err := grpclib.Dial(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+               now := timestamp.NowMilli()
+               interval := 500 * time.Millisecond
+               end := now.Add(1 * time.Hour)
+               cases_measure_data.Write(conn, "service_cpm_minute", 
"sw_metric", "service_cpm_minute_data.json", now, interval)
+               rootCmd.SetArgs([]string{"measure", "query", "-a", addr, "-f", 
"-"})
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+metadata:
+  name: service_cpm_minute
+  group: sw_metric
+timeRange:
+  begin: %s
+  end: %s
+tagProjection:
+  tagFamilies:
+    - name: default
+      tags:
+        - id`, now.Format(RFC3339), end.Format(RFC3339))))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               Expect(err).NotTo(HaveOccurred())
+                       })
+               }
+               Eventually(issue).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       GinkgoWriter.Println(out)
+                       resp := new(measure_v1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.DataPoints)
+               }).Should(Equal(6))
+       })
+
+       AfterEach(func() {
+               deferFunc()
+       })
+})
diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go
index f6e746a..7a791c9 100644
--- a/bydbctl/internal/cmd/rest.go
+++ b/bydbctl/internal/cmd/rest.go
@@ -21,21 +21,34 @@ import (
        "encoding/json"
        "fmt"
        "io"
+       "time"
 
        "github.com/go-resty/resty/v2"
        "github.com/pkg/errors"
        "github.com/spf13/viper"
+       str2duration "github.com/xhit/go-str2duration/v2"
+       "go.uber.org/multierr"
+       stpb "google.golang.org/genproto/googleapis/rpc/status"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
        "sigs.k8s.io/yaml"
 
        "github.com/apache/skywalking-banyandb/bydbctl/pkg/file"
 )
 
+const (
+       // RFC3339 refers to https://www.rfc-editor.org/rfc/rfc3339
+       RFC3339   = "2006-01-02T15:04:05Z07:00"
+       timeRange = 30 * time.Minute
+)
+
 var errMalformedInput = errors.New("malformed input")
 
 type reqBody struct {
-       name  string
-       group string
-       data  []byte
+       name       string
+       group      string
+       parsedData map[string]interface{}
+       data       []byte
 }
 
 type request struct {
@@ -86,10 +99,15 @@ func parseFromYAML(tryParseGroup bool, reader io.Reader) 
(requests []reqBody, er
                if !ok {
                        return nil, errors.WithMessage(errMalformedInput, 
"absent node: name in metadata")
                }
+               j, err = json.Marshal(data)
+               if err != nil {
+                       return nil, err
+               }
                requests = append(requests, reqBody{
-                       name:  name,
-                       group: group,
-                       data:  j,
+                       name:       name,
+                       group:      group,
+                       data:       j,
+                       parsedData: data,
                })
        }
        return requests, nil
@@ -111,6 +129,65 @@ func parseGroupFromFlags() ([]reqBody, error) {
        return []reqBody{{group: group}}, nil
 }
 
+func parseTimeRangeFromFlagAndYAML(reader io.Reader) (requests []reqBody, err 
error) {
+       var startTS, endTS time.Time
+       if start == "" && end == "" {
+               startTS = time.Now().Add((-30) * time.Minute)
+               endTS = time.Now()
+       } else if start != "" && end != "" {
+               if startTS, err = parseTime(start); err != nil {
+                       return nil, err
+               }
+               if endTS, err = parseTime(start); err != nil {
+                       return nil, err
+               }
+       } else if start != "" {
+               if startTS, err = parseTime(start); err != nil {
+                       return nil, err
+               }
+               endTS = startTS.Add(timeRange)
+       } else {
+               if endTS, err = parseTime(end); err != nil {
+                       return nil, err
+               }
+               startTS = endTS.Add(-timeRange)
+       }
+       s := startTS.Format(RFC3339)
+       e := endTS.Format(RFC3339)
+       if requests, err = parseNameAndGroupFromYAML(reader); err != nil {
+               return nil, err
+       }
+       for _, rb := range requests {
+               if rb.parsedData["timeRange"] != nil {
+                       continue
+               }
+               timeRange := make(map[string]interface{})
+               timeRange["start"] = s
+               timeRange["end"] = e
+               rb.parsedData["timeRange"] = timeRange
+               rb.data, err = json.Marshal(rb.parsedData)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return requests, nil
+}
+
+func parseTime(timestamp string) (time.Time, error) { // if timeStamp is 
duration(relative time), return (true, the uint of timestamp, nil).
+       if len(timestamp) < 1 {
+               return time.Time{}, errors.New("time is empty")
+       }
+       t, errAbsoluteTime := time.Parse(timestamp, RFC3339)
+       if errAbsoluteTime == nil {
+               return t, nil
+       }
+       duration, err := str2duration.ParseDuration(timestamp)
+       if err != nil {
+               return time.Time{}, 
errors.WithMessagef(multierr.Combine(errAbsoluteTime, err), "time %s is neither 
absolute time nor relative time", timestamp)
+       }
+       return time.Now().Add(duration), nil
+}
+
 type printer func(index int, reqBody reqBody, body []byte) error
 
 func yamlPrinter(index int, _ reqBody, body []byte) error {
@@ -146,7 +223,14 @@ func rest(pfn paramsFn, fn reqFn, printer printer) (err 
error) {
                if err != nil {
                        return err
                }
-               err = printer(i, r, resp.Body())
+               bd := resp.Body()
+               var st *stpb.Status
+               err = json.Unmarshal(bd, &st)
+               if err == nil && st.Code != int32(codes.OK) {
+                       s := status.FromProto(st)
+                       return s.Err()
+               }
+               err = printer(i, r, bd)
                if err != nil {
                        return err
                }
diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go
index d6669a1..285afa1 100644
--- a/bydbctl/internal/cmd/root.go
+++ b/bydbctl/internal/cmd/root.go
@@ -31,6 +31,8 @@ import (
 var (
        filePath string
        name     string
+       start    string
+       end      string
        cfgFile  string
        rootCmd  = &cobra.Command{
                DisableAutoGenTag: true,
@@ -53,7 +55,7 @@ func RootCmdFlags(command *cobra.Command) {
        _ = viper.BindPFlag("addr", command.PersistentFlags().Lookup("addr"))
        viper.SetDefault("addr", "http://localhost:17913";)
 
-       command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd())
+       command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(), 
newMeasureCmd())
 }
 
 func init() {
@@ -108,3 +110,10 @@ func bindNameFlag(commands ...*cobra.Command) {
                _ = c.MarkFlagRequired("name")
        }
 }
+
+func bindTimeRangeFlag(commands ...*cobra.Command) {
+       for _, c := range commands {
+               c.Flags().StringVarP(&start, "start", "s", "", "Start time of 
the time range during which the query is preformed")
+               c.Flags().StringVarP(&end, "end", "e", "", "End time of the 
time range during which the query is preformed")
+       }
+}
diff --git a/bydbctl/internal/cmd/stream.go b/bydbctl/internal/cmd/stream.go
index 719ec32..6db1a94 100644
--- a/bydbctl/internal/cmd/stream.go
+++ b/bydbctl/internal/cmd/stream.go
@@ -104,7 +104,6 @@ func newStreamCmd() *cobra.Command {
                                })
                },
        }
-       bindFileFlag(createCmd, updateCmd)
 
        getCmd := &cobra.Command{
                Use:     "get [-g group] -n name",
@@ -145,19 +144,19 @@ func newStreamCmd() *cobra.Command {
        }
 
        queryCmd := &cobra.Command{
-               Use:     "query",
+               Use:     "query -f [file|dir|-]",
                Version: version.Build(),
                Short:   "Query data in a stream",
-               PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
-                       return cmd.Parent().PersistentPreRunE(cmd.Parent(), 
args)
-               },
                RunE: func(cmd *cobra.Command, _ []string) (err error) {
-                       return rest(func() ([]reqBody, error) { return 
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+                       return rest(func() ([]reqBody, error) { return 
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
                                func(request request) (*resty.Response, error) {
                                        return 
request.req.SetBody(request.data).Post(getPath("/api/v1/stream/data"))
                                }, yamlPrinter)
                },
        }
+       bindFileFlag(createCmd, updateCmd, queryCmd)
+       bindTimeRangeFlag(queryCmd)
+
        streamCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
        return streamCmd
 }
diff --git a/bydbctl/internal/cmd/stream_test.go 
b/bydbctl/internal/cmd/stream_test.go
index 966d99f..0d18c51 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -18,52 +18,42 @@
 package cmd_test
 
 import (
-       "context"
+       "fmt"
        "strings"
        "time"
 
-       "github.com/ghodss/yaml"
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
        "github.com/spf13/cobra"
        "github.com/zenizh/go-capturer"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
 
        database_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
-       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
-       "github.com/apache/skywalking-banyandb/banyand/liaison/http"
-       "github.com/apache/skywalking-banyandb/banyand/measure"
-       "github.com/apache/skywalking-banyandb/banyand/metadata"
-       "github.com/apache/skywalking-banyandb/banyand/query"
-       "github.com/apache/skywalking-banyandb/banyand/queue"
-       "github.com/apache/skywalking-banyandb/banyand/stream"
+       stream_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
-       "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       cases_stream_data 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
 )
 
-var _ = Describe("Stream", func() {
-       var path string
-       var gracefulStop, deferFunc func()
-       var listenClientURL, listenPeerURL string
+const (
+       RFC3339 = "2006-01-02T15:04:05Z07:00"
+)
+
+var _ = Describe("Stream Schema Operation", func() {
+       var addr string
+       var deferFunc func()
        var rootCmd *cobra.Command
        BeforeEach(func() {
-               var err error
-               path, deferFunc, err = test.NewSpace()
-               Expect(err).NotTo(HaveOccurred())
-               listenClientURL, listenPeerURL, err = test.NewEtcdListenUrls()
-               Expect(err).NotTo(HaveOccurred())
-               flags := []string{
-                       "--stream-root-path=" + path, "--measure-root-path=" + 
path, "--metadata-root-path=" + path,
-                       "--etcd-listen-client-url=" + listenClientURL, 
"--etcd-listen-peer-url=" + listenPeerURL,
-               }
-               gracefulStop = setup(false, flags)
-               Eventually(helpers.HTTPHealthCheck("localhost:17913"), 
10*time.Second).Should(Succeed())
-               time.Sleep(1 * time.Second)
+               _, addr, deferFunc = setup.SetUp()
+               Eventually(helpers.HTTPHealthCheck(addr), 
10*time.Second).Should(Succeed())
+               addr = "http://"; + addr
                // extracting the operation of creating stream schema
                rootCmd = &cobra.Command{Use: "root"}
                cmd.RootCmdFlags(rootCmd)
-               rootCmd.SetArgs([]string{"group", "create", "-f", "-"})
+               rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", 
"-"})
                rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: group1
@@ -84,11 +74,16 @@ resource_opts:
                        Expect(err).NotTo(HaveOccurred())
                })
                Expect(out).To(ContainSubstring("group group1 is created"))
-               rootCmd.SetArgs([]string{"stream", "create", "-f", "-"})
+               rootCmd.SetArgs([]string{"stream", "create", "-a", addr, "-f", 
"-"})
                rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: name1
-  group: group1`))
+  group: group1
+tagFamilies:
+  - name: searchable
+    tags: 
+      - name: trace_id
+        type: TAG_TYPE_STRING`))
                out = capturer.CaptureStdout(func() {
                        err := rootCmd.Execute()
                        Expect(err).NotTo(HaveOccurred())
@@ -102,8 +97,9 @@ metadata:
                        err := rootCmd.Execute()
                        Expect(err).NotTo(HaveOccurred())
                })
+               GinkgoWriter.Println(out)
                resp := new(database_v1.StreamRegistryServiceGetResponse)
-               Expect(yaml.Unmarshal([]byte(out), resp)).To(Succeed())
+               helpers.UnmarshalYAML([]byte(out), resp)
                Expect(resp.Stream.Metadata.Group).To(Equal("group1"))
                Expect(resp.Stream.Metadata.Name).To(Equal("name1"))
        })
@@ -143,11 +139,8 @@ entity:
                Expect(out).To(ContainSubstring("stream group1.name1 is 
deleted"))
                // get again
                rootCmd.SetArgs([]string{"stream", "get", "-g", "group1", "-n", 
"name1"})
-               out = capturer.CaptureStdout(func() {
-                       err := rootCmd.Execute()
-                       Expect(err).NotTo(HaveOccurred())
-               })
-               Expect(out).To(ContainSubstring("resource not found"))
+               err := rootCmd.Execute()
+               Expect(err).To(MatchError("rpc error: code = NotFound desc = 
banyandb: resource not found"))
        })
 
        It("list stream schema", func() {
@@ -169,78 +162,117 @@ metadata:
                        Expect(err).NotTo(HaveOccurred())
                })
                resp := new(database_v1.StreamRegistryServiceListResponse)
-               Expect(yaml.Unmarshal([]byte(out), resp)).To(Succeed())
+               helpers.UnmarshalYAML([]byte(out), resp)
                Expect(resp.Stream).To(HaveLen(2))
        })
 
        AfterEach(func() {
-               gracefulStop()
                deferFunc()
        })
 })
 
-func setup(loadMetadata bool, flags []string) func() {
-       // Init `Discovery` module
-       repo, err := discovery.NewServiceRepo(context.Background())
-       Expect(err).NotTo(HaveOccurred())
-       // Init `Queue` module
-       pipeline, err := queue.NewQueue(context.TODO(), repo)
-       Expect(err).NotTo(HaveOccurred())
-       // Init `Metadata` module
-       metaSvc, err := metadata.NewService(context.TODO())
-       Expect(err).NotTo(HaveOccurred())
-       // Init `Stream` module
-       streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, 
pipeline)
-       Expect(err).NotTo(HaveOccurred())
-       // Init `Measure` module
-       measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, 
pipeline)
-       Expect(err).NotTo(HaveOccurred())
-       // Init `Query` module
-       q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc, 
metaSvc, repo, pipeline)
-       Expect(err).NotTo(HaveOccurred())
-       tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+var _ = Describe("Stream Data Query", func() {
+       var addr, grpcAddr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       var now time.Time
+       var nowStr, endStr string
+       var interval time.Duration
+       BeforeEach(func() {
+               now = timestamp.NowMilli()
+               nowStr = now.Format(RFC3339)
+               interval = 500 * time.Millisecond
+               endStr = now.Add(1 * time.Hour).Format(RFC3339)
+               grpcAddr, addr, deferFunc = setup.SetUp()
+               Eventually(helpers.HTTPHealthCheck(addr), 
10*time.Second).Should(Succeed())
+               addr = "http://"; + addr
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+       })
 
-       httpServer := http.NewService()
-       if loadMetadata {
-               return test.SetUpModules(
-                       flags,
-                       repo,
-                       pipeline,
-                       metaSvc,
-                       &preloadStreamService{metaSvc: metaSvc},
-                       &preloadMeasureService{metaSvc: metaSvc},
-                       streamSvc,
-                       measureSvc,
-                       q,
-                       tcp,
-                       httpServer,
+       It("query stream data", func() {
+               conn, err := grpclib.Dial(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
                )
-       }
-       return test.SetUpModules(
-               flags,
-               repo,
-               pipeline,
-               metaSvc,
-               streamSvc,
-               measureSvc,
-               q,
-               tcp,
-               httpServer,
-       )
-}
-
-type preloadStreamService struct {
-       metaSvc metadata.Service
-}
-
-type preloadMeasureService struct {
-       metaSvc metadata.Service
-}
+               Expect(err).NotTo(HaveOccurred())
 
-func (p *preloadStreamService) Name() string {
-       return "preload-stream"
-}
+               cases_stream_data.Write(conn, "data.json", now, interval)
+               rootCmd.SetArgs([]string{"stream", "query", "-a", addr, "-f", 
"-"})
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+metadata:
+  name: sw
+  group: default
+timeRange:
+  begin: %s
+  end: %s
+projection:
+  tagFamilies:
+    - name: searchable
+      tags:
+        - trace_id`, nowStr, endStr)))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               Expect(err).NotTo(HaveOccurred())
+                       })
+               }
+               Eventually(issue).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       resp := new(stream_v1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Elements)
+               }).Should(Equal(5))
+       })
+       DescribeTable("query stream data with time range flags", func(timeArgs 
...string) {
+               conn, err := grpclib.Dial(
+                       grpcAddr,
+                       
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               )
+               Expect(err).NotTo(HaveOccurred())
+               now := timestamp.NowMilli()
+               interval := 500 * time.Millisecond
+               cases_stream_data.Write(conn, "data.json", now, interval)
+               args := []string{"stream", "query", "-a", addr}
+               args = append(args, timeArgs...)
+               args = append(args, "-f", "-")
+               rootCmd.SetArgs(args)
+               issue := func() string {
+                       rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: sw
+  group: default
+projection:
+  tagFamilies:
+    - name: searchable
+      tags:
+        - trace_id`))
+                       return capturer.CaptureStdout(func() {
+                               err := rootCmd.Execute()
+                               Expect(err).NotTo(HaveOccurred())
+                       })
+               }
+               Eventually(issue).ShouldNot(ContainSubstring("code:"))
+               Eventually(func() int {
+                       out := issue()
+                       resp := new(stream_v1.QueryResponse)
+                       helpers.UnmarshalYAML([]byte(out), resp)
+                       GinkgoWriter.Println(resp)
+                       return len(resp.Elements)
+               }).Should(Equal(5))
+       },
+               Entry("relative start", "--start", "-30m"),
+               Entry("relative end", "--end", "0m"),
+               Entry("absolute start", "--start", nowStr),
+               Entry("absolute end", "--end", endStr),
+               Entry("default"),
+               Entry("all relative", "--start", "-30m", "--end", "0m"),
+               Entry("all absolute", "--start", nowStr, "--end", endStr),
+       )
 
-func (p *preloadMeasureService) Name() string {
-       return "preload-measure"
-}
+       AfterEach(func() {
+               deferFunc()
+       })
+})
diff --git a/dist/LICENSE b/dist/LICENSE
index 6074210..e41d8fb 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -264,6 +264,7 @@ BSD-3-Clause licenses
     github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3 BSD-3-Clause
     github.com/pmezard/go-difflib v1.0.0 BSD-3-Clause
     github.com/spf13/pflag v1.0.5 BSD-3-Clause
+    github.com/xhit/go-str2duration/v2 v2.0.0 BSD-3-Clause
     golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 BSD-3-Clause
     golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f BSD-3-Clause
     golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 BSD-3-Clause
@@ -336,7 +337,6 @@ MIT and Apache-2.0 licenses
 MIT and BSD-3-Clause licenses
 ========================================================================
 
-    github.com/ghodss/yaml v1.0.0 MIT and BSD-3-Clause
     sigs.k8s.io/yaml v1.2.0 MIT and BSD-3-Clause
 
 ========================================================================
diff --git a/dist/licenses/license-github.com-ghodss-yaml.txt 
b/dist/licenses/license-github.com-xhit-go-str2duration-v2.txt
similarity index 55%
rename from dist/licenses/license-github.com-ghodss-yaml.txt
rename to dist/licenses/license-github.com-xhit-go-str2duration-v2.txt
index 7805d36..ea5ea89 100644
--- a/dist/licenses/license-github.com-ghodss-yaml.txt
+++ b/dist/licenses/license-github.com-xhit-go-str2duration-v2.txt
@@ -1,27 +1,4 @@
-The MIT License (MIT)
-
-Copyright (c) 2014 Sam Ghods
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
-
-
-Copyright (c) 2012 The Go Authors. All rights reserved.
+Copyright (c) 2009 The Go Authors. All rights reserved.
 
 Redistribution and use in source and binary forms, with or without
 modification, are permitted provided that the following conditions are
@@ -47,4 +24,4 @@ LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS 
OF USE,
 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 2abaffb..173cd7c 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,6 @@ require (
        github.com/dgraph-io/badger/v3 v3.2011.1
        github.com/dgraph-io/ristretto v0.1.0
        github.com/envoyproxy/protoc-gen-validate v0.1.0
-       github.com/ghodss/yaml v1.0.0
        github.com/go-chi/chi/v5 v5.0.7
        github.com/go-resty/resty/v2 v2.7.0
        github.com/golang/mock v1.6.0
@@ -30,6 +29,7 @@ require (
        github.com/spf13/pflag v1.0.5
        github.com/spf13/viper v1.12.0
        github.com/stretchr/testify v1.7.1
+       github.com/xhit/go-str2duration/v2 v2.0.0
        github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04
        go.etcd.io/etcd/client/v3 v3.5.4
        go.etcd.io/etcd/server/v3 v3.5.4
diff --git a/go.sum b/go.sum
index 044c43c..df29d43 100644
--- a/go.sum
+++ b/go.sum
@@ -172,7 +172,6 @@ github.com/fsnotify/fsnotify v1.5.4 
h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwV
 github.com/fsnotify/fsnotify v1.5.4/go.mod 
h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
 github.com/getsentry/raven-go v0.2.0 
h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
 github.com/getsentry/raven-go v0.2.0/go.mod 
h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
-github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
 github.com/ghodss/yaml v1.0.0/go.mod 
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8=
 github.com/go-chi/chi/v5 v5.0.7/go.mod 
h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
@@ -496,6 +495,8 @@ github.com/tmc/grpc-websocket-proxy 
v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 
h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod 
h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod 
h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
+github.com/xhit/go-str2duration/v2 v2.0.0 
h1:uFtk6FWB375bP7ewQl+/1wBcn840GPhnySOdcz/okPE=
+github.com/xhit/go-str2duration/v2 v2.0.0/go.mod 
h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 
h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod 
h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
 github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod 
h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
diff --git a/pkg/test/helpers/http_health.go b/pkg/test/helpers/http_health.go
index ec6aa2f..47fa01f 100644
--- a/pkg/test/helpers/http_health.go
+++ b/pkg/test/helpers/http_health.go
@@ -18,6 +18,7 @@ package helpers
 
 import (
        "fmt"
+       "time"
 
        "github.com/go-resty/resty/v2"
 )
@@ -30,11 +31,13 @@ func HTTPHealthCheck(addr string) func() error {
                        SetHeader("Accept", "application/json").
                        Get(fmt.Sprintf("http://%s/api/healthz";, addr))
                if err != nil {
+                       time.Sleep(1 * time.Second)
                        return err
                }
 
                if resp.StatusCode() != 200 {
                        l.Warn().Str("responded_status", 
resp.Status()).Msg("service unhealthy")
+                       time.Sleep(1 * time.Second)
                        return ErrServiceUnhealthy
                }
                l.Info().Stringer("response", resp).Msg("connected")
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 50997c3..ee5b011 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/http"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/query"
@@ -37,15 +38,18 @@ import (
 
 const host = "127.0.0.1"
 
-func SetUp(flags ...string) (string, func()) {
+func SetUp(flags ...string) (string, string, func()) {
        path, deferFn, err := test.NewSpace()
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        var ports []int
        ports, err = test.AllocateFreePorts(4)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        addr := fmt.Sprintf("%s:%d", host, ports[0])
+       httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
        ff := []string{
                "--addr=" + addr,
+               "--http-addr=" + httpAddr,
+               "--grpc-addr=" + addr,
                "--stream-root-path=" + path,
                "--measure-root-path=" + path,
                "--metadata-root-path=" + path,
@@ -55,7 +59,7 @@ func SetUp(flags ...string) (string, func()) {
                ff = append(ff, flags...)
        }
        gracefulStop := modules(ff)
-       return addr, func() {
+       return addr, httpAddr, func() {
                gracefulStop()
                deferFn()
        }
@@ -81,6 +85,7 @@ func modules(flags []string) func() {
        q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc, 
metaSvc, repo, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+       httpServer := http.NewService()
 
        return test.SetUpModules(
                flags,
@@ -93,6 +98,7 @@ func modules(flags []string) func() {
                measureSvc,
                q,
                tcp,
+               httpServer,
        )
 }
 
diff --git a/test/cases/stream/stream.go b/test/cases/stream/data/data.go
similarity index 52%
copy from test/cases/stream/stream.go
copy to test/cases/stream/data/data.go
index 600862d..07b07f8 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/data/data.go
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package stream_test contains integration test cases of the stream
-package stream_test
+// Package data contains integration test cases of the stream
+package data
 
 import (
        "context"
@@ -24,7 +24,6 @@ import (
        "encoding/base64"
        "encoding/json"
        "io"
-       "math"
        "strconv"
        "time"
 
@@ -37,28 +36,29 @@ import (
        "google.golang.org/protobuf/types/known/timestamppb"
        "sigs.k8s.io/yaml"
 
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       common_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       model_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       stream_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
 )
 
-// SharedContext is the parallel execution context
-var SharedContext helpers.SharedContext
-
 //go:embed input/*.yaml
 var inputFS embed.FS
 
 //go:embed want/*.yaml
 var wantFS embed.FS
 
-var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+// VerifyFn verify whether the query response matches the wanted result
+var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
        i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
-       innerGm.Expect(err).NotTo(gm.HaveOccurred())
-       query := &streamv1.QueryRequest{}
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       query := &stream_v1.QueryRequest{}
        helpers.UnmarshalYAML(i, query)
-       query.TimeRange = helpers.TimeRange(args, SharedContext)
-       c := streamv1.NewStreamServiceClient(SharedContext.Connection)
+       query.TimeRange = helpers.TimeRange(args, sharedContext)
+       c := stream_v1.NewStreamServiceClient(sharedContext.Connection)
        ctx := context.Background()
        resp, err := c.Query(ctx, query)
        if args.WantErr {
@@ -67,21 +67,21 @@ var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
                }
                return
        }
-       innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
+       gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
        if args.WantEmpty {
-               innerGm.Expect(resp.Elements).To(gm.BeEmpty())
+               gm.Expect(resp.Elements).To(gm.BeEmpty())
                return
        }
        if args.Want == "" {
                args.Want = args.Input
        }
        ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
-       innerGm.Expect(err).NotTo(gm.HaveOccurred())
-       want := &streamv1.QueryResponse{}
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       want := &stream_v1.QueryResponse{}
        helpers.UnmarshalYAML(ww, want)
-       innerGm.Expect(cmp.Equal(resp, want,
+       gm.Expect(cmp.Equal(resp, want,
                protocmp.IgnoreUnknown(),
-               protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
+               protocmp.IgnoreFields(&stream_v1.Element{}, "timestamp"),
                protocmp.Transform())).
                To(gm.BeTrue(), func() string {
                        j, err := protojson.Marshal(resp)
@@ -96,36 +96,7 @@ var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
                })
 }
 
-var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
-       gm.Eventually(func(innerGm gm.Gomega) {
-               verifyFn(innerGm, args)
-       })
-},
-       g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * 
time.Hour}),
-       g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
-       g.Entry("offset", helpers.Args{Input: "offset", Duration: 1 * 
time.Hour}),
-       g.Entry("nothing", helpers.Args{Input: "all", WantEmpty: true}),
-       g.Entry("invalid time range", helpers.Args{
-               Input: "all",
-               Begin: timestamppb.New(time.Unix(0, 
int64(math.MinInt64+time.Millisecond)).Truncate(time.Millisecond)),
-               End:   timestamppb.New(time.Unix(0, 
math.MaxInt64).Truncate(time.Millisecond)),
-       }),
-       g.Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 * 
time.Hour}),
-       g.Entry("global index", helpers.Args{Input: "global_index", Duration: 1 
* time.Hour}),
-       g.Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag", 
Duration: 1 * time.Hour}),
-       g.Entry("get empty result by non-indexed tag", helpers.Args{Input: 
"filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}),
-       g.Entry("numeric local index: less", helpers.Args{Input: "less", 
Duration: 1 * time.Hour}),
-       g.Entry("numeric local index: less and eq", helpers.Args{Input: 
"less_eq", Duration: 1 * time.Hour}),
-       g.Entry("logical expression", helpers.Args{Input: "logical", Duration: 
1 * time.Hour}),
-       g.Entry("having", helpers.Args{Input: "having", Duration: 1 * 
time.Hour}),
-       g.Entry("full text searching", helpers.Args{Input: "search", Duration: 
1 * time.Hour}),
-       g.Entry("indexed only tags", helpers.Args{Input: "indexed_only", 
Duration: 1 * time.Hour}),
-)
-
-//go:embed testdata/*.json
-var dataFS embed.FS
-
-func loadData(stream streamv1.StreamService_WriteClient, dataFile string, 
baseTime time.Time, interval time.Duration) {
+func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, 
baseTime time.Time, interval time.Duration) {
        var templates []interface{}
        content, err := dataFS.ReadFile("testdata/" + dataFile)
        gm.Expect(err).ShouldNot(gm.HaveOccurred())
@@ -134,16 +105,16 @@ func loadData(stream streamv1.StreamService_WriteClient, 
dataFile string, baseTi
        for i, template := range templates {
                rawSearchTagFamily, errMarshal := json.Marshal(template)
                gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
-               searchTagFamily := &modelv1.TagFamilyForWrite{}
+               searchTagFamily := &model_v1.TagFamilyForWrite{}
                gm.Expect(protojson.Unmarshal(rawSearchTagFamily, 
searchTagFamily)).ShouldNot(gm.HaveOccurred())
-               e := &streamv1.ElementValue{
+               e := &stream_v1.ElementValue{
                        ElementId: strconv.Itoa(i),
                        Timestamp: timestamppb.New(baseTime.Add(interval * 
time.Duration(i))),
-                       TagFamilies: []*modelv1.TagFamilyForWrite{
+                       TagFamilies: []*model_v1.TagFamilyForWrite{
                                {
-                                       Tags: []*modelv1.TagValue{
+                                       Tags: []*model_v1.TagValue{
                                                {
-                                                       Value: 
&modelv1.TagValue_BinaryData{
+                                                       Value: 
&model_v1.TagValue_BinaryData{
                                                                BinaryData: bb,
                                                        },
                                                },
@@ -152,8 +123,8 @@ func loadData(stream streamv1.StreamService_WriteClient, 
dataFile string, baseTi
                        },
                }
                e.TagFamilies = append(e.TagFamilies, searchTagFamily)
-               errInner := stream.Send(&streamv1.WriteRequest{
-                       Metadata: &commonv1.Metadata{
+               errInner := stream.Send(&stream_v1.WriteRequest{
+                       Metadata: &common_v1.Metadata{
                                Name:  "sw",
                                Group: "default",
                        },
@@ -165,7 +136,7 @@ func loadData(stream streamv1.StreamService_WriteClient, 
dataFile string, baseTi
 
 // Write data into the server
 func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time, 
interval time.Duration) {
-       c := streamv1.NewStreamServiceClient(conn)
+       c := stream_v1.NewStreamServiceClient(conn)
        ctx := context.Background()
        writeClient, err := c.Write(ctx)
        gm.Expect(err).NotTo(gm.HaveOccurred())
diff --git a/test/cases/stream/input/all.yaml 
b/test/cases/stream/data/input/all.yaml
similarity index 100%
rename from test/cases/stream/input/all.yaml
rename to test/cases/stream/data/input/all.yaml
diff --git a/test/cases/stream/input/filter_tag.yaml 
b/test/cases/stream/data/input/filter_tag.yaml
similarity index 100%
rename from test/cases/stream/input/filter_tag.yaml
rename to test/cases/stream/data/input/filter_tag.yaml
diff --git a/test/cases/stream/input/filter_tag_empty.yaml 
b/test/cases/stream/data/input/filter_tag_empty.yaml
similarity index 100%
rename from test/cases/stream/input/filter_tag_empty.yaml
rename to test/cases/stream/data/input/filter_tag_empty.yaml
diff --git a/test/cases/stream/input/global_index.yaml 
b/test/cases/stream/data/input/global_index.yaml
similarity index 100%
rename from test/cases/stream/input/global_index.yaml
rename to test/cases/stream/data/input/global_index.yaml
diff --git a/test/cases/stream/input/having.yaml 
b/test/cases/stream/data/input/having.yaml
similarity index 100%
rename from test/cases/stream/input/having.yaml
rename to test/cases/stream/data/input/having.yaml
diff --git a/test/cases/stream/input/indexed_only.yaml 
b/test/cases/stream/data/input/indexed_only.yaml
similarity index 100%
rename from test/cases/stream/input/indexed_only.yaml
rename to test/cases/stream/data/input/indexed_only.yaml
diff --git a/test/cases/stream/input/less.yaml 
b/test/cases/stream/data/input/less.yaml
similarity index 100%
rename from test/cases/stream/input/less.yaml
rename to test/cases/stream/data/input/less.yaml
diff --git a/test/cases/stream/input/less_eq.yaml 
b/test/cases/stream/data/input/less_eq.yaml
similarity index 100%
rename from test/cases/stream/input/less_eq.yaml
rename to test/cases/stream/data/input/less_eq.yaml
diff --git a/test/cases/stream/input/limit.yaml 
b/test/cases/stream/data/input/limit.yaml
similarity index 100%
rename from test/cases/stream/input/limit.yaml
rename to test/cases/stream/data/input/limit.yaml
diff --git a/test/cases/stream/input/logical.yaml 
b/test/cases/stream/data/input/logical.yaml
similarity index 100%
rename from test/cases/stream/input/logical.yaml
rename to test/cases/stream/data/input/logical.yaml
diff --git a/test/cases/stream/input/offset.yaml 
b/test/cases/stream/data/input/offset.yaml
similarity index 100%
rename from test/cases/stream/input/offset.yaml
rename to test/cases/stream/data/input/offset.yaml
diff --git a/test/cases/stream/input/search.yaml 
b/test/cases/stream/data/input/search.yaml
similarity index 100%
rename from test/cases/stream/input/search.yaml
rename to test/cases/stream/data/input/search.yaml
diff --git a/test/cases/stream/input/sort_desc.yaml 
b/test/cases/stream/data/input/sort_desc.yaml
similarity index 100%
rename from test/cases/stream/input/sort_desc.yaml
rename to test/cases/stream/data/input/sort_desc.yaml
diff --git a/test/cases/stream/testdata/data.json 
b/test/cases/stream/data/testdata/data.json
similarity index 100%
rename from test/cases/stream/testdata/data.json
rename to test/cases/stream/data/testdata/data.json
diff --git a/test/cases/stream/want/all.yaml 
b/test/cases/stream/data/want/all.yaml
similarity index 100%
rename from test/cases/stream/want/all.yaml
rename to test/cases/stream/data/want/all.yaml
diff --git a/test/cases/stream/want/filter_tag.yaml 
b/test/cases/stream/data/want/filter_tag.yaml
similarity index 100%
rename from test/cases/stream/want/filter_tag.yaml
rename to test/cases/stream/data/want/filter_tag.yaml
diff --git a/test/cases/stream/want/global_index.yaml 
b/test/cases/stream/data/want/global_index.yaml
similarity index 100%
rename from test/cases/stream/want/global_index.yaml
rename to test/cases/stream/data/want/global_index.yaml
diff --git a/test/cases/stream/want/having.yaml 
b/test/cases/stream/data/want/having.yaml
similarity index 100%
rename from test/cases/stream/want/having.yaml
rename to test/cases/stream/data/want/having.yaml
diff --git a/test/cases/stream/want/indexed_only.yaml 
b/test/cases/stream/data/want/indexed_only.yaml
similarity index 100%
rename from test/cases/stream/want/indexed_only.yaml
rename to test/cases/stream/data/want/indexed_only.yaml
diff --git a/test/cases/stream/want/less.yaml 
b/test/cases/stream/data/want/less.yaml
similarity index 100%
rename from test/cases/stream/want/less.yaml
rename to test/cases/stream/data/want/less.yaml
diff --git a/test/cases/stream/want/less_eq.yaml 
b/test/cases/stream/data/want/less_eq.yaml
similarity index 100%
rename from test/cases/stream/want/less_eq.yaml
rename to test/cases/stream/data/want/less_eq.yaml
diff --git a/test/cases/stream/want/limit.yaml 
b/test/cases/stream/data/want/limit.yaml
similarity index 100%
rename from test/cases/stream/want/limit.yaml
rename to test/cases/stream/data/want/limit.yaml
diff --git a/test/cases/stream/want/logical.yaml 
b/test/cases/stream/data/want/logical.yaml
similarity index 100%
rename from test/cases/stream/want/logical.yaml
rename to test/cases/stream/data/want/logical.yaml
diff --git a/test/cases/stream/want/offset.yaml 
b/test/cases/stream/data/want/offset.yaml
similarity index 100%
rename from test/cases/stream/want/offset.yaml
rename to test/cases/stream/data/want/offset.yaml
diff --git a/test/cases/stream/want/search.yaml 
b/test/cases/stream/data/want/search.yaml
similarity index 100%
rename from test/cases/stream/want/search.yaml
rename to test/cases/stream/data/want/search.yaml
diff --git a/test/cases/stream/want/sort_desc.yaml 
b/test/cases/stream/data/want/sort_desc.yaml
similarity index 100%
rename from test/cases/stream/want/sort_desc.yaml
rename to test/cases/stream/data/want/sort_desc.yaml
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index 600862d..88f7ab8 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -19,88 +19,25 @@
 package stream_test
 
 import (
-       "context"
-       "embed"
-       "encoding/base64"
-       "encoding/json"
-       "io"
        "math"
-       "strconv"
        "time"
 
-       "github.com/google/go-cmp/cmp"
        g "github.com/onsi/ginkgo/v2"
-       gm "github.com/onsi/gomega"
-       grpclib "google.golang.org/grpc"
-       "google.golang.org/protobuf/encoding/protojson"
-       "google.golang.org/protobuf/testing/protocmp"
        "google.golang.org/protobuf/types/known/timestamppb"
-       "sigs.k8s.io/yaml"
 
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       stream_test_data 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
 )
 
-// SharedContext is the parallel execution context
-var SharedContext helpers.SharedContext
-
-//go:embed input/*.yaml
-var inputFS embed.FS
-
-//go:embed want/*.yaml
-var wantFS embed.FS
-
-var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
-       i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
-       innerGm.Expect(err).NotTo(gm.HaveOccurred())
-       query := &streamv1.QueryRequest{}
-       helpers.UnmarshalYAML(i, query)
-       query.TimeRange = helpers.TimeRange(args, SharedContext)
-       c := streamv1.NewStreamServiceClient(SharedContext.Connection)
-       ctx := context.Background()
-       resp, err := c.Query(ctx, query)
-       if args.WantErr {
-               if err == nil {
-                       g.Fail("expect error")
-               }
-               return
-       }
-       innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
-       if args.WantEmpty {
-               innerGm.Expect(resp.Elements).To(gm.BeEmpty())
-               return
-       }
-       if args.Want == "" {
-               args.Want = args.Input
+var (
+       // SharedContext is the parallel execution context
+       SharedContext helpers.SharedContext
+       verify        = func(args helpers.Args) {
+               stream_test_data.VerifyFn(SharedContext, args)
        }
-       ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
-       innerGm.Expect(err).NotTo(gm.HaveOccurred())
-       want := &streamv1.QueryResponse{}
-       helpers.UnmarshalYAML(ww, want)
-       innerGm.Expect(cmp.Equal(resp, want,
-               protocmp.IgnoreUnknown(),
-               protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
-               protocmp.Transform())).
-               To(gm.BeTrue(), func() string {
-                       j, err := protojson.Marshal(resp)
-                       if err != nil {
-                               return err.Error()
-                       }
-                       y, err := yaml.JSONToYAML(j)
-                       if err != nil {
-                               return err.Error()
-                       }
-                       return string(y)
-               })
-}
+)
 
-var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
-       gm.Eventually(func(innerGm gm.Gomega) {
-               verifyFn(innerGm, args)
-       })
-},
+var _ = g.DescribeTable("Scanning Streams", verify,
        g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * 
time.Hour}),
        g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
        g.Entry("offset", helpers.Args{Input: "offset", Duration: 1 * 
time.Hour}),
@@ -121,58 +58,3 @@ var _ = g.DescribeTable("Scanning Streams", func(args 
helpers.Args) {
        g.Entry("full text searching", helpers.Args{Input: "search", Duration: 
1 * time.Hour}),
        g.Entry("indexed only tags", helpers.Args{Input: "indexed_only", 
Duration: 1 * time.Hour}),
 )
-
-//go:embed testdata/*.json
-var dataFS embed.FS
-
-func loadData(stream streamv1.StreamService_WriteClient, dataFile string, 
baseTime time.Time, interval time.Duration) {
-       var templates []interface{}
-       content, err := dataFS.ReadFile("testdata/" + dataFile)
-       gm.Expect(err).ShouldNot(gm.HaveOccurred())
-       gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
-       bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
-       for i, template := range templates {
-               rawSearchTagFamily, errMarshal := json.Marshal(template)
-               gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
-               searchTagFamily := &modelv1.TagFamilyForWrite{}
-               gm.Expect(protojson.Unmarshal(rawSearchTagFamily, 
searchTagFamily)).ShouldNot(gm.HaveOccurred())
-               e := &streamv1.ElementValue{
-                       ElementId: strconv.Itoa(i),
-                       Timestamp: timestamppb.New(baseTime.Add(interval * 
time.Duration(i))),
-                       TagFamilies: []*modelv1.TagFamilyForWrite{
-                               {
-                                       Tags: []*modelv1.TagValue{
-                                               {
-                                                       Value: 
&modelv1.TagValue_BinaryData{
-                                                               BinaryData: bb,
-                                                       },
-                                               },
-                                       },
-                               },
-                       },
-               }
-               e.TagFamilies = append(e.TagFamilies, searchTagFamily)
-               errInner := stream.Send(&streamv1.WriteRequest{
-                       Metadata: &commonv1.Metadata{
-                               Name:  "sw",
-                               Group: "default",
-                       },
-                       Element: e,
-               })
-               gm.Expect(errInner).ShouldNot(gm.HaveOccurred())
-       }
-}
-
-// Write data into the server
-func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time, 
interval time.Duration) {
-       c := streamv1.NewStreamServiceClient(conn)
-       ctx := context.Background()
-       writeClient, err := c.Write(ctx)
-       gm.Expect(err).NotTo(gm.HaveOccurred())
-       loadData(writeClient, dataFile, baseTime, interval)
-       gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
-       gm.Eventually(func() error {
-               _, err := writeClient.Recv()
-               return err
-       }).Should(gm.Equal(io.EOF))
-}
diff --git a/test/integration/cold_query/query_suite_test.go 
b/test/integration/cold_query/query_suite_test.go
index 3971560..9b35b6c 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -21,8 +21,8 @@ import (
        "testing"
        "time"
 
-       g "github.com/onsi/ginkgo/v2"
-       gm "github.com/onsi/gomega"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
        grpclib "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
 
@@ -30,14 +30,15 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
-       casesMeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
-       casesMeasureData 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
-       casesStream "github.com/apache/skywalking-banyandb/test/cases/stream"
+       cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
+       cases_measure_data 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+       cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
+       cases_stream_data 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
 )
 
 func TestIntegrationColdQuery(t *testing.T) {
-       gm.RegisterFailHandler(g.Fail)
-       g.RunSpecs(t, "Integration Query Cold Data Suite")
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Integration Query Cold Data Suite")
 }
 
 var (
@@ -46,25 +47,25 @@ var (
        deferFunc  func()
 )
 
-var _ = g.SynchronizedBeforeSuite(func() []byte {
-       gm.Expect(logger.Init(logger.Logging{
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
                Env:   "dev",
                Level: "warn",
-       })).To(gm.Succeed())
+       })).To(Succeed())
        var addr string
-       addr, deferFunc = setup.SetUp()
+       addr, _, deferFunc = setup.SetUp()
        conn, err := grpclib.Dial(
                addr,
                grpclib.WithTransportCredentials(insecure.NewCredentials()),
        )
-       gm.Expect(err).NotTo(gm.HaveOccurred())
+       Expect(err).NotTo(HaveOccurred())
        now = timestamp.NowMilli().Add(-time.Hour * 24)
        interval := 500 * time.Millisecond
-       casesStream.Write(conn, "data.json", now, interval)
-       casesMeasureData.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data.json", now, interval)
-       casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
-       casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
-       gm.Expect(conn.Close()).To(gm.Succeed())
+       cases_stream_data.Write(conn, "data.json", now, interval)
+       cases_measure_data.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data.json", now, interval)
+       cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
+       cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
+       Expect(conn.Close()).To(Succeed())
        return []byte(addr)
 }, func(address []byte) {
        var err error
@@ -73,20 +74,20 @@ var _ = g.SynchronizedBeforeSuite(func() []byte {
                grpclib.WithTransportCredentials(insecure.NewCredentials()),
                grpclib.WithBlock(),
        )
-       casesStream.SharedContext = helpers.SharedContext{
+       cases_stream.SharedContext = helpers.SharedContext{
                Connection: connection,
                BaseTime:   now,
        }
-       casesMeasure.SharedContext = helpers.SharedContext{
+       cases_measure.SharedContext = helpers.SharedContext{
                Connection: connection,
                BaseTime:   now,
        }
-       gm.Expect(err).NotTo(gm.HaveOccurred())
+       Expect(err).NotTo(HaveOccurred())
 })
 
-var _ = g.SynchronizedAfterSuite(func() {
+var _ = SynchronizedAfterSuite(func() {
        if connection != nil {
-               gm.Expect(connection.Close()).To(gm.Succeed())
+               Expect(connection.Close()).To(Succeed())
        }
 }, func() {
        deferFunc()
diff --git a/test/integration/other/measure_test.go 
b/test/integration/other/measure_test.go
index f162e64..d1c0e55 100644
--- a/test/integration/other/measure_test.go
+++ b/test/integration/other/measure_test.go
@@ -39,7 +39,7 @@ var _ = g.Describe("Query service_cpm_minute", func() {
 
        g.BeforeEach(func() {
                var addr string
-               addr, deferFn = setup.SetUp()
+               addr, _, deferFn = setup.SetUp()
                var err error
                conn, err = grpclib.Dial(
                        addr,
diff --git a/test/integration/other/property_test.go 
b/test/integration/other/property_test.go
index 37282b7..3a1d5ff 100644
--- a/test/integration/other/property_test.go
+++ b/test/integration/other/property_test.go
@@ -38,7 +38,7 @@ var _ = Describe("Property application", func() {
 
        BeforeEach(func() {
                var addr string
-               addr, deferFn = setup.SetUp()
+               addr, _, deferFn = setup.SetUp()
                var err error
                conn, err = grpclib.Dial(
                        addr,
@@ -101,7 +101,7 @@ var _ = Describe("Property application", func() {
 
        BeforeEach(func() {
                var addr string
-               addr, deferFn = setup.SetUp()
+               addr, _, deferFn = setup.SetUp()
                var err error
                conn, err = grpclib.Dial(
                        addr,
diff --git a/test/integration/other/tls_test.go 
b/test/integration/other/tls_test.go
index 97fc855..f363ae5 100644
--- a/test/integration/other/tls_test.go
+++ b/test/integration/other/tls_test.go
@@ -45,7 +45,7 @@ var _ = g.Describe("Query service_cpm_minute", func() {
                certFile := filepath.Join(basePath, "testdata/server_cert.pem")
                keyFile := filepath.Join(basePath, "testdata/server_key.pem")
                var addr string
-               addr, deferFn = setup.SetUp("--tls=true", 
"--cert-file="+certFile, "--key-file="+keyFile)
+               addr, _, deferFn = setup.SetUp("--tls=true", 
"--cert-file="+certFile, "--key-file="+keyFile)
                var err error
                creds, err := credentials.NewClientTLSFromFile(certFile, 
"localhost")
                gm.Expect(err).NotTo(gm.HaveOccurred())
diff --git a/test/integration/query/query_suite_test.go 
b/test/integration/query/query_suite_test.go
index 3579482..b1a3092 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -21,8 +21,8 @@ import (
        "testing"
        "time"
 
-       g "github.com/onsi/ginkgo/v2"
-       gm "github.com/onsi/gomega"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
        grpclib "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
 
@@ -33,11 +33,12 @@ import (
        cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
        cases_measure_data 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
        cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
+       cases_stream_data 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
 )
 
 func TestIntegrationQuery(t *testing.T) {
-       gm.RegisterFailHandler(g.Fail)
-       g.RunSpecs(t, "Integration Query Suite")
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Integration Query Suite")
 }
 
 var (
@@ -46,25 +47,25 @@ var (
        deferFunc  func()
 )
 
-var _ = g.SynchronizedBeforeSuite(func() []byte {
-       gm.Expect(logger.Init(logger.Logging{
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
                Env:   "dev",
                Level: "warn",
-       })).To(gm.Succeed())
+       })).To(Succeed())
        var addr string
-       addr, deferFunc = setup.SetUp()
+       addr, _, deferFunc = setup.SetUp()
        conn, err := grpclib.Dial(
                addr,
                grpclib.WithTransportCredentials(insecure.NewCredentials()),
        )
-       gm.Expect(err).NotTo(gm.HaveOccurred())
+       Expect(err).NotTo(HaveOccurred())
        now = timestamp.NowMilli()
        interval := 500 * time.Millisecond
-       cases_stream.Write(conn, "data.json", now, interval)
+       cases_stream_data.Write(conn, "data.json", now, interval)
        cases_measure_data.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data.json", now, interval)
        cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
        cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
-       gm.Expect(conn.Close()).To(gm.Succeed())
+       Expect(conn.Close()).To(Succeed())
        return []byte(addr)
 }, func(address []byte) {
        var err error
@@ -81,12 +82,12 @@ var _ = g.SynchronizedBeforeSuite(func() []byte {
                Connection: connection,
                BaseTime:   now,
        }
-       gm.Expect(err).NotTo(gm.HaveOccurred())
+       Expect(err).NotTo(HaveOccurred())
 })
 
-var _ = g.SynchronizedAfterSuite(func() {
+var _ = SynchronizedAfterSuite(func() {
        if connection != nil {
-               gm.Expect(connection.Close()).To(gm.Succeed())
+               Expect(connection.Close()).To(Succeed())
        }
 }, func() {
        deferFunc()


Reply via email to