This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ebdbb62 add measure and group cmd (#180)
ebdbb62 is described below
commit ebdbb62c066223a9e9b106da0c60ea4e50f559b2
Author: sacloud <[email protected]>
AuthorDate: Sun Oct 9 22:36:36 2022 +0800
add measure and group cmd (#180)
* add measure and group cmd
* Add metadata verification
* Implement grpc health check client
* Handle grpc error
* add start and end flag
Signed-off-by: Gao Hongtao <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
---
api/common/id.go | 13 ++
banyand/liaison/grpc/measure.go | 13 +-
banyand/liaison/grpc/stream.go | 13 +-
banyand/liaison/http/health.go | 42 +++-
banyand/liaison/http/server.go | 37 ++--
banyand/query/processor.go | 33 ++-
bydbctl/internal/cmd/group.go | 66 +++++-
bydbctl/internal/cmd/group_test.go | 150 ++++++++++++++
bydbctl/internal/cmd/{stream.go => measure.go} | 74 +++----
bydbctl/internal/cmd/measure_test.go | 220 ++++++++++++++++++++
bydbctl/internal/cmd/rest.go | 98 ++++++++-
bydbctl/internal/cmd/root.go | 11 +-
bydbctl/internal/cmd/stream.go | 11 +-
bydbctl/internal/cmd/stream_test.go | 230 ++++++++++++---------
dist/LICENSE | 2 +-
...license-github.com-xhit-go-str2duration-v2.txt} | 27 +--
go.mod | 2 +-
go.sum | 3 +-
pkg/test/helpers/http_health.go | 3 +
pkg/test/setup/setup.go | 10 +-
test/cases/stream/{stream.go => data/data.go} | 87 +++-----
test/cases/stream/{ => data}/input/all.yaml | 0
test/cases/stream/{ => data}/input/filter_tag.yaml | 0
.../stream/{ => data}/input/filter_tag_empty.yaml | 0
.../stream/{ => data}/input/global_index.yaml | 0
test/cases/stream/{ => data}/input/having.yaml | 0
.../stream/{ => data}/input/indexed_only.yaml | 0
test/cases/stream/{ => data}/input/less.yaml | 0
test/cases/stream/{ => data}/input/less_eq.yaml | 0
test/cases/stream/{ => data}/input/limit.yaml | 0
test/cases/stream/{ => data}/input/logical.yaml | 0
test/cases/stream/{ => data}/input/offset.yaml | 0
test/cases/stream/{ => data}/input/search.yaml | 0
test/cases/stream/{ => data}/input/sort_desc.yaml | 0
test/cases/stream/{ => data}/testdata/data.json | 0
test/cases/stream/{ => data}/want/all.yaml | 0
test/cases/stream/{ => data}/want/filter_tag.yaml | 0
.../cases/stream/{ => data}/want/global_index.yaml | 0
test/cases/stream/{ => data}/want/having.yaml | 0
.../cases/stream/{ => data}/want/indexed_only.yaml | 0
test/cases/stream/{ => data}/want/less.yaml | 0
test/cases/stream/{ => data}/want/less_eq.yaml | 0
test/cases/stream/{ => data}/want/limit.yaml | 0
test/cases/stream/{ => data}/want/logical.yaml | 0
test/cases/stream/{ => data}/want/offset.yaml | 0
test/cases/stream/{ => data}/want/search.yaml | 0
test/cases/stream/{ => data}/want/sort_desc.yaml | 0
test/cases/stream/stream.go | 134 +-----------
test/integration/cold_query/query_suite_test.go | 45 ++--
test/integration/other/measure_test.go | 2 +-
test/integration/other/property_test.go | 4 +-
test/integration/other/tls_test.go | 2 +-
test/integration/query/query_suite_test.go | 29 +--
53 files changed, 900 insertions(+), 461 deletions(-)
diff --git a/api/common/id.go b/api/common/id.go
index 7443083..e106b54 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -19,6 +19,7 @@ package common
import (
"context"
+ "fmt"
"github.com/prometheus/client_golang/prometheus"
@@ -69,3 +70,15 @@ func SetPosition(ctx context.Context, fn func(p Position)
Position) context.Cont
}
return context.WithValue(ctx, PositionKey, fn(p))
}
+
+type Error struct {
+ msg string
+}
+
+func NewError(tpl string, args ...any) Error {
+ return Error{msg: fmt.Sprintf(tpl, args...)}
+}
+
+func (e Error) Msg() string {
+ return e.msg
+}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index c77a4b5..5cc53c9 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -22,9 +22,11 @@ import (
"io"
"time"
+ "github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -99,11 +101,14 @@ func (ms *measureService) Query(_ context.Context,
entityCriteria *measurev1.Que
if errFeat != nil {
return nil, errFeat
}
- queryMsg, ok := msg.Data().([]*measurev1.DataPoint)
- if !ok {
- return nil, ErrQueryMsg
+ data := msg.Data()
+ switch d := data.(type) {
+ case []*measurev1.DataPoint:
+ return &measurev1.QueryResponse{DataPoints: d}, nil
+ case common.Error:
+ return nil, errors.WithMessage(ErrQueryMsg, d.Msg())
}
- return &measurev1.QueryResponse{DataPoints: queryMsg}, nil
+ return nil, ErrQueryMsg
}
// TODO: implement topN
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index cc61a8f..8cf6e1b 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -22,9 +22,11 @@ import (
"io"
"time"
+ "github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -103,9 +105,12 @@ func (s *streamService) Query(_ context.Context,
entityCriteria *streamv1.QueryR
if errFeat != nil {
return nil, errFeat
}
- queryMsg, ok := msg.Data().([]*streamv1.Element)
- if !ok {
- return nil, ErrQueryMsg
+ data := msg.Data()
+ switch d := data.(type) {
+ case []*streamv1.Element:
+ return &streamv1.QueryResponse{Elements: d}, nil
+ case common.Error:
+ return nil, errors.WithMessage(ErrQueryMsg, d.Msg())
}
- return &streamv1.QueryResponse{Elements: queryMsg}, nil
+ return nil, ErrQueryMsg
}
diff --git a/banyand/liaison/http/health.go b/banyand/liaison/http/health.go
index 8adf613..3912c7b 100644
--- a/banyand/liaison/http/health.go
+++ b/banyand/liaison/http/health.go
@@ -19,17 +19,55 @@ package http
import (
"context"
+ "time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
+
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
-type healthCheckClient struct{}
+func newHealthCheckClient(ctx context.Context, l *logger.Logger, addr string,
opts []grpc.DialOption) (client *healthCheckClient, err error) {
+ conn, err := grpc.Dial(addr, opts...)
+ if err != nil {
+ return nil, err
+ }
+ defer func() {
+ if err != nil {
+ if cerr := conn.Close(); cerr != nil {
+ l.Info().Str("addr",
addr).Err(cerr).Msg("Failed to close conn")
+ }
+ return
+ }
+ go func() {
+ <-ctx.Done()
+ if cerr := conn.Close(); cerr != nil {
+ l.Info().Str("addr",
addr).Err(cerr).Msg("Failed to close conn")
+ }
+ }()
+ }()
+ return &healthCheckClient{conn: conn}, nil
+}
+
+type healthCheckClient struct {
+ conn *grpc.ClientConn
+}
func (g *healthCheckClient) Check(ctx context.Context, r
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption)
(*grpc_health_v1.HealthCheckResponse, error) {
- return &grpc_health_v1.HealthCheckResponse{Status:
grpc_health_v1.HealthCheckResponse_SERVING}, nil
+ var resp *grpc_health_v1.HealthCheckResponse
+ if err := grpchelper.Request(context.Background(), 10*time.Second,
func(rpcCtx context.Context) (err error) {
+ resp, err = grpc_health_v1.NewHealthClient(g.conn).Check(rpcCtx,
+ &grpc_health_v1.HealthCheckRequest{
+ Service: "",
+ })
+ return err
+ }); err != nil {
+ return nil, err
+ }
+ return resp, nil
}
func (g *healthCheckClient) Watch(ctx context.Context, r
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption)
(grpc_health_v1.Health_WatchClient, error) {
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index b078211..7800296 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -67,7 +67,7 @@ type service struct {
func (p *service) FlagSet() *run.FlagSet {
flagSet := run.NewFlagSet("")
flagSet.StringVar(&p.listenAddr, "http-addr", ":17913", "listen addr
for http")
- flagSet.StringVar(&p.grpcAddr, "grcp-addr", "localhost:17912", "the
grpc addr")
+ flagSet.StringVar(&p.grpcAddr, "grpc-addr", "localhost:17912", "the
grpc addr")
return flagSet
}
@@ -92,27 +92,26 @@ func (p *service) PreRun() error {
serveIndex := serveFileContents("index.html", httpFS)
p.mux.Mount("/", intercept404(fileServer, serveIndex))
- gwMux :=
runtime.NewServeMux(runtime.WithHealthzEndpoint(&healthCheckClient{}))
var ctx context.Context
ctx, p.clientCloser = context.WithCancel(context.Background())
-
+ opts := []grpc.DialOption{
+ // TODO: add TLS
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ }
+ client, err := newHealthCheckClient(ctx, p.l, p.grpcAddr, opts)
+ if err != nil {
+ return err
+ }
+ gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
err = multierr.Combine(
-
database_v1.RegisterStreamRegistryServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr,
-
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-
database_v1.RegisterMeasureRegistryServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr,
-
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-
database_v1.RegisterIndexRuleRegistryServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr,
-
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-
database_v1.RegisterIndexRuleBindingRegistryServiceHandlerFromEndpoint(ctx,
gwMux, p.grpcAddr,
-
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
-
database_v1.RegisterGroupRegistryServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr,
-
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
- stream_v1.RegisterStreamServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr,
-
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
- measure_v1.RegisterMeasureServiceHandlerFromEndpoint(ctx,
gwMux, p.grpcAddr,
-
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
- property_v1.RegisterPropertyServiceHandlerFromEndpoint(ctx,
gwMux, p.grpcAddr,
-
[]grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}),
+
database_v1.RegisterStreamRegistryServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr, opts),
+
database_v1.RegisterMeasureRegistryServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr, opts),
+
database_v1.RegisterIndexRuleRegistryServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr, opts),
+
database_v1.RegisterIndexRuleBindingRegistryServiceHandlerFromEndpoint(ctx,
gwMux, p.grpcAddr, opts),
+
database_v1.RegisterGroupRegistryServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr, opts),
+ stream_v1.RegisterStreamServiceHandlerFromEndpoint(ctx, gwMux,
p.grpcAddr, opts),
+ measure_v1.RegisterMeasureServiceHandlerFromEndpoint(ctx,
gwMux, p.grpcAddr, opts),
+ property_v1.RegisterPropertyServiceHandlerFromEndpoint(ctx,
gwMux, p.grpcAddr, opts),
)
if err != nil {
return err
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index c5ba8ff..64d6146 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -23,6 +23,7 @@ import (
"go.uber.org/multierr"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
@@ -63,9 +64,10 @@ type streamQueryProcessor struct {
}
func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
+ now := time.Now().UnixNano()
queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
if !ok {
- p.log.Warn().Msg("invalid event data type")
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("invalid event data type"))
return
}
p.log.Debug().Stringer("criteria", queryCriteria).Msg("received a query
request")
@@ -73,27 +75,25 @@ func (p *streamQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
meta := queryCriteria.GetMetadata()
ec, err := p.streamService.Stream(meta)
if err != nil {
- p.log.Error().Err(err).
- Str("stream", meta.GetName()).
- Msg("fail to get execution context for stream")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to get execution context for stream %s: %v", meta.GetName(), err))
return
}
analyzer, err :=
logical_stream.CreateAnalyzerFromMetaService(p.metaService)
if err != nil {
- p.log.Error().Err(err).Msg("fail to build analyzer")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to build analyzer for stream %s: %v", meta.GetName(), err))
return
}
s, err := analyzer.BuildSchema(context.TODO(), meta)
if err != nil {
- p.log.Error().Err(err).Msg("fail to build")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to build schema for stream %s: %v", meta.GetName(), err))
return
}
plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s)
if err != nil {
- p.log.Error().Err(err).Msg("fail to analyze the query request")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for stream %s: %v", meta.GetName(), err))
return
}
@@ -101,11 +101,10 @@ func (p *streamQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
entities, err := plan.(executor.StreamExecutable).Execute(ec)
if err != nil {
- p.log.Error().Err(err).Msg("fail to execute the query plan")
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("execute the query plan for stream %s: %v", meta.GetName(),
err))
return
}
- now := time.Now().UnixNano()
resp = bus.NewMessage(bus.MessageID(now), entities)
return
@@ -118,8 +117,9 @@ type measureQueryProcessor struct {
func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
+ now := time.Now().UnixNano()
if !ok {
- p.queryService.log.Warn().Msg("invalid event data type")
+ resp = bus.NewMessage(bus.MessageID(now),
common.NewError("invalid event data type"))
return
}
p.log.Debug().Msg("received a query event")
@@ -127,27 +127,25 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
meta := queryCriteria.GetMetadata()
ec, err := p.measureService.Measure(meta)
if err != nil {
- p.log.Error().Err(err).
- Str("measure", meta.GetName()).
- Msg("fail to get execution context")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to get execution context for measure %s: %v", meta.GetName(), err))
return
}
analyzer, err :=
logical_measure.CreateAnalyzerFromMetaService(p.metaService)
if err != nil {
- p.log.Error().Err(err).Msg("fail to build analyzer")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to build analyzer for measure %s: %v", meta.GetName(), err))
return
}
s, err := analyzer.BuildSchema(context.TODO(), meta)
if err != nil {
- p.queryService.log.Error().Err(err).Msg("fail to build measure
schema")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to build schema for measure %s: %v", meta.GetName(), err))
return
}
plan, err := analyzer.Analyze(context.TODO(), queryCriteria, meta, s)
if err != nil {
- p.queryService.log.Error().Err(err).Msg("fail to analyze the
query request")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for measure %s: %v", meta.GetName(), err))
return
}
@@ -155,7 +153,7 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
mIterator, err := plan.(executor.MeasureExecutable).Execute(ec)
if err != nil {
- p.queryService.log.Error().Err(err).Msg("fail to execute the
query plan")
+ resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to execute the query plan for measure %s: %v", meta.GetName(), err))
return
}
defer func() {
@@ -170,7 +168,6 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
result = append(result, current[0])
}
}
- now := time.Now().UnixNano()
resp = bus.NewMessage(bus.MessageID(now), result)
return
}
diff --git a/bydbctl/internal/cmd/group.go b/bydbctl/internal/cmd/group.go
index cf5e5d0..ba4b0e4 100644
--- a/bydbctl/internal/cmd/group.go
+++ b/bydbctl/internal/cmd/group.go
@@ -65,15 +65,68 @@ func newGroupCmd() *cobra.Command {
})
},
}
- bindFileFlag(createCmd)
+
+ updateCmd := &cobra.Command{
+ Use: "update -f [file|dir|-]",
+ Version: version.Build(),
+ Short: "Update groups from files",
+ RunE: func(cmd *cobra.Command, args []string) (err error) {
+ return rest(func() ([]reqBody, error) { return
parseNameFromYAML(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ g := new(common_v1.Group)
+ err :=
protojson.Unmarshal(request.data, g)
+ if err != nil {
+ return nil, err
+ }
+ cr :=
&database_v1.GroupRegistryServiceUpdateRequest{
+ Group: g,
+ }
+ b, err := json.Marshal(cr)
+ if err != nil {
+ return nil, err
+ }
+ return
request.req.SetBody(b).SetPathParam("group",
request.name).Put(getPath("/api/v1/group/schema/{group}"))
+ },
+ func(_ int, reqBody reqBody, _ []byte) error {
+ fmt.Printf("group %s is updated",
reqBody.name)
+ fmt.Println()
+ return nil
+ })
+ },
+ }
+ bindFileFlag(createCmd, updateCmd)
+
+ getCmd := &cobra.Command{
+ Use: "get [-g group]",
+ Version: version.Build(),
+ Short: "Get a group",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/group/schema/{group}"))
+ }, yamlPrinter)
+ },
+ }
+
+ deleteCmd := &cobra.Command{
+ Use: "delete [-g group]",
+ Version: version.Build(),
+ Short: "Delete a group",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("group",
request.group).Delete(getPath("/api/v1/group/schema/{group}"))
+ },
+ func(_ int, reqBody reqBody, _ []byte) error {
+ fmt.Printf("group %s is deleted",
reqBody.name)
+ fmt.Println()
+ return nil
+ })
+ },
+ }
listCmd := &cobra.Command{
Use: "list",
Version: version.Build(),
- Short: "list all groups",
- PersistentPreRunE: func(cmd *cobra.Command, args []string) (err
error) {
- return cmd.Parent().PersistentPreRunE(cmd.Parent(),
args)
- },
+ Short: "List all groups",
RunE: func(cmd *cobra.Command, args []string) (err error) {
return rest(nil, func(request request)
(*resty.Response, error) {
return
request.req.Get(getPath("/api/v1/group/schema/lists"))
@@ -81,7 +134,6 @@ func newGroupCmd() *cobra.Command {
},
}
- // todo:GroupGetCmd, GroupUpdateCmd, GroupDeleteCmd
- groupCmd.AddCommand(createCmd, listCmd)
+ groupCmd.AddCommand(createCmd, updateCmd, listCmd, getCmd, deleteCmd)
return groupCmd
}
diff --git a/bydbctl/internal/cmd/group_test.go
b/bydbctl/internal/cmd/group_test.go
new file mode 100644
index 0000000..9362e27
--- /dev/null
+++ b/bydbctl/internal/cmd/group_test.go
@@ -0,0 +1,150 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+ "strings"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/spf13/cobra"
+ "github.com/zenizh/go-capturer"
+
+ database_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+var _ = Describe("Group", func() {
+ var addr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ BeforeEach(func() {
+ _, addr, deferFunc = setup.SetUp()
+ Eventually(helpers.HTTPHealthCheck(addr),
10*time.Second).Should(Succeed())
+ addr = "http://" + addr
+ // extracting the operation of creating group
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f",
"-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: group1
+catalog: CATALOG_STREAM
+resource_opts:
+ shard_num: 2
+ block_interval:
+ unit: UNIT_HOUR
+ num: 2
+ segment_interval:
+ unit: UNIT_DAY
+ num: 1
+ ttl:
+ unit: UNIT_DAY
+ num: 7`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("group group1 is created"))
+ })
+
+ It("get group", func() {
+ rootCmd.SetArgs([]string{"group", "get", "-g", "group1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.GroupRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Group.Metadata.Name).To(Equal("group1"))
+ })
+
+ It("update group", func() {
+ rootCmd.SetArgs([]string{"group", "update", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: group1
+catalog: CATALOG_STREAM
+resource_opts:
+ shard_num: 1
+ block_interval:
+ unit: UNIT_HOUR
+ num: 2
+ segment_interval:
+ unit: UNIT_DAY
+ num: 1
+ ttl:
+ unit: UNIT_DAY
+ num: 7`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("group group1 is updated"))
+ })
+
+ It("delete group", func() {
+ rootCmd.SetArgs([]string{"group", "delete", "-g", "group1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("group group1 is deleted"))
+ })
+
+ It("list group", func() {
+ // create another group for list operation
+ rootCmd.SetArgs([]string{"group", "create", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: group2
+catalog: CATALOG_STREAM
+resource_opts:
+ shard_num: 2
+ block_interval:
+ unit: UNIT_HOUR
+ num: 2
+ segment_interval:
+ unit: UNIT_DAY
+ num: 1
+ ttl:
+ unit: UNIT_DAY
+ num: 7`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("group group2 is created"))
+ // list
+ rootCmd.SetArgs([]string{"group", "list"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.GroupRegistryServiceListResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Group).To(HaveLen(4))
+ })
+
+ AfterEach(func() {
+ deferFunc()
+ })
+})
diff --git a/bydbctl/internal/cmd/stream.go b/bydbctl/internal/cmd/measure.go
similarity index 69%
copy from bydbctl/internal/cmd/stream.go
copy to bydbctl/internal/cmd/measure.go
index 719ec32..9a66323 100644
--- a/bydbctl/internal/cmd/stream.go
+++ b/bydbctl/internal/cmd/measure.go
@@ -23,51 +23,46 @@ import (
"github.com/go-resty/resty/v2"
"github.com/spf13/cobra"
- "github.com/spf13/viper"
"google.golang.org/protobuf/encoding/protojson"
database_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/pkg/version"
)
-const streamSchemaPath = "/api/v1/stream/schema"
+const measureSchemaPath = "/api/v1/measure/schema"
-var streamSchemaPathWithParams = streamSchemaPath + "/{group}/{name}"
+var measureSchemaPathWithParams = measureSchemaPath + "/{group}/{name}"
-func getPath(path string) string {
- return viper.GetString("addr") + path
-}
-
-func newStreamCmd() *cobra.Command {
- streamCmd := &cobra.Command{
- Use: "stream",
+func newMeasureCmd() *cobra.Command {
+ measureCmd := &cobra.Command{
+ Use: "measure",
Version: version.Build(),
- Short: "Stream operation",
+ Short: "Measure operation",
}
createCmd := &cobra.Command{
Use: "create -f [file|dir|-]",
Version: version.Build(),
- Short: "Create streams from files",
+ Short: "Create measures from files",
RunE: func(cmd *cobra.Command, _ []string) error {
return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- s := new(database_v1.Stream)
+ s := new(database_v1.Measure)
err :=
protojson.Unmarshal(request.data, s)
if err != nil {
return nil, err
}
- cr :=
&database_v1.StreamRegistryServiceCreateRequest{
- Stream: s,
+ cr :=
&database_v1.MeasureRegistryServiceCreateRequest{
+ Measure: s,
}
b, err := json.Marshal(cr)
if err != nil {
return nil, err
}
- return
request.req.SetBody(b).Post(getPath(streamSchemaPath))
+ return
request.req.SetBody(b).Post(getPath(measureSchemaPath))
},
func(_ int, reqBody reqBody, _ []byte) error {
- fmt.Printf("stream %s.%s is created",
reqBody.group, reqBody.name)
+ fmt.Printf("measure %s.%s is created",
reqBody.group, reqBody.name)
fmt.Println()
return nil
})
@@ -77,17 +72,17 @@ func newStreamCmd() *cobra.Command {
updateCmd := &cobra.Command{
Use: "update -f [file|dir|-]",
Version: version.Build(),
- Short: "Update streams from files",
+ Short: "Update measures from files",
RunE: func(cmd *cobra.Command, _ []string) (err error) {
return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- s := new(database_v1.Stream)
+ s := new(database_v1.Measure)
err :=
protojson.Unmarshal(request.data, s)
if err != nil {
return nil, err
}
- cr :=
&database_v1.StreamRegistryServiceUpdateRequest{
- Stream: s,
+ cr :=
&database_v1.MeasureRegistryServiceUpdateRequest{
+ Measure: s,
}
b, err := json.Marshal(cr)
if err != nil {
@@ -95,24 +90,23 @@ func newStreamCmd() *cobra.Command {
}
return request.req.SetBody(b).
SetPathParam("name",
request.name).SetPathParam("group", request.group).
-
Put(getPath(streamSchemaPathWithParams))
+
Put(getPath(measureSchemaPathWithParams))
},
func(_ int, reqBody reqBody, _ []byte) error {
- fmt.Printf("stream %s.%s is updated",
reqBody.group, reqBody.name)
+ fmt.Printf("measure %s.%s is updated",
reqBody.group, reqBody.name)
fmt.Println()
return nil
})
},
}
- bindFileFlag(createCmd, updateCmd)
getCmd := &cobra.Command{
Use: "get [-g group] -n name",
Version: version.Build(),
- Short: "Get a stream",
+ Short: "Get a measure",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Get(getPath(streamSchemaPathWithParams))
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Get(getPath(measureSchemaPathWithParams))
}, yamlPrinter)
},
}
@@ -120,12 +114,12 @@ func newStreamCmd() *cobra.Command {
deleteCmd := &cobra.Command{
Use: "delete [-g group] -n name",
Version: version.Build(),
- Short: "Delete a stream",
+ Short: "Delete a measure",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Delete(getPath(streamSchemaPathWithParams))
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Delete(getPath(measureSchemaPathWithParams))
}, func(_ int, reqBody reqBody, _ []byte) error {
- fmt.Printf("stream %s.%s is deleted",
reqBody.group, reqBody.name)
+ fmt.Printf("measure %s.%s is deleted",
reqBody.group, reqBody.name)
fmt.Println()
return nil
})
@@ -136,28 +130,28 @@ func newStreamCmd() *cobra.Command {
listCmd := &cobra.Command{
Use: "list [-g group]",
Version: version.Build(),
- Short: "List streams",
+ Short: "List measures",
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
- return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/stream/schema/lists/{group}"))
+ return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/measure/schema/lists/{group}"))
}, yamlPrinter)
},
}
queryCmd := &cobra.Command{
- Use: "query",
+ Use: "query -f [file|dir|-]",
Version: version.Build(),
- Short: "Query data in a stream",
- PersistentPreRunE: func(cmd *cobra.Command, args []string) (err
error) {
- return cmd.Parent().PersistentPreRunE(cmd.Parent(),
args)
- },
+ Short: "Query data in a measure",
RunE: func(cmd *cobra.Command, _ []string) (err error) {
- return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+ return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
- return
request.req.SetBody(request.data).Post(getPath("/api/v1/stream/data"))
+ return
request.req.SetBody(request.data).Post(getPath("/api/v1/measure/data"))
}, yamlPrinter)
},
}
- streamCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
- return streamCmd
+ bindFileFlag(createCmd, updateCmd, queryCmd)
+ bindTimeRangeFlag(queryCmd)
+
+ measureCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
+ return measureCmd
}
diff --git a/bydbctl/internal/cmd/measure_test.go
b/bydbctl/internal/cmd/measure_test.go
new file mode 100644
index 0000000..b7e79c3
--- /dev/null
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -0,0 +1,220 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/spf13/cobra"
+ "github.com/zenizh/go-capturer"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ database_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measure_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ cases_measure_data
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+)
+
+var _ = Describe("Measure Schema Operation", func() {
+ var addr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ BeforeEach(func() {
+ _, addr, deferFunc = setup.SetUp()
+ Eventually(helpers.HTTPHealthCheck(addr),
10*time.Second).Should(Succeed())
+ addr = "http://" + addr
+ // extracting the operation of creating measure schema
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f",
"-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: group1
+catalog: CATALOG_MEASURE
+resource_opts:
+ shard_num: 2
+ block_interval:
+ unit: UNIT_HOUR
+ num: 2
+ segment_interval:
+ unit: UNIT_DAY
+ num: 1
+ ttl:
+ unit: UNIT_DAY
+ num: 7`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("group group1 is created"))
+ rootCmd.SetArgs([]string{"measure", "create", "-a", addr, "-f",
"-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name1
+ group: group1`))
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("measure group1.name1 is
created"))
+ })
+
+ It("get measure schema", func() {
+ rootCmd.SetArgs([]string{"measure", "get", "-g", "group1",
"-n", "name1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.MeasureRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Measure.Metadata.Group).To(Equal("group1"))
+ Expect(resp.Measure.Metadata.Name).To(Equal("name1"))
+ })
+
+ It("update measure schema", func() {
+ rootCmd.SetArgs([]string{"measure", "update", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name1
+ group: group1
+entity:
+ tagNames: ["tag1"]`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("measure group1.name1 is
updated"))
+ rootCmd.SetArgs([]string{"measure", "get", "-g", "group1",
"-n", "name1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.MeasureRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Measure.Metadata.Group).To(Equal("group1"))
+ Expect(resp.Measure.Metadata.Name).To(Equal("name1"))
+ Expect(resp.Measure.Entity.TagNames[0]).To(Equal("tag1"))
+ })
+
+ It("delete measure schema", func() {
+ // delete
+ rootCmd.SetArgs([]string{"measure", "delete", "-g", "group1",
"-n", "name1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("measure group1.name1 is
deleted"))
+ // get again
+ rootCmd.SetArgs([]string{"measure", "get", "-g", "group1",
"-n", "name1"})
+ err := rootCmd.Execute()
+ Expect(err).To(MatchError("rpc error: code = NotFound desc =
banyandb: resource not found"))
+ })
+
+ It("list measure schema", func() {
+ // create another measure schema for list operation
+ rootCmd.SetArgs([]string{"measure", "create", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name2
+ group: group1`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("measure group1.name2 is
created"))
+ // list
+ rootCmd.SetArgs([]string{"measure", "list", "-g", "group1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.MeasureRegistryServiceListResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Measure).To(HaveLen(2))
+ })
+
+ AfterEach(func() {
+ deferFunc()
+ })
+})
+
+var _ = Describe("Measure Data Query", func() {
+ var addr, grpcAddr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ BeforeEach(func() {
+ grpcAddr, addr, deferFunc = setup.SetUp()
+ Eventually(helpers.HTTPHealthCheck(addr),
10*time.Second).Should(Succeed())
+ addr = "http://" + addr
+ time.Sleep(1 * time.Second)
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ })
+
+ It("query measure data", func() {
+ conn, err := grpclib.Dial(
+ grpcAddr,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ now := timestamp.NowMilli()
+ interval := 500 * time.Millisecond
+ end := now.Add(1 * time.Hour)
+ cases_measure_data.Write(conn, "service_cpm_minute",
"sw_metric", "service_cpm_minute_data.json", now, interval)
+ rootCmd.SetArgs([]string{"measure", "query", "-a", addr, "-f",
"-"})
+ issue := func() string {
+ rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+metadata:
+ name: service_cpm_minute
+ group: sw_metric
+timeRange:
+ begin: %s
+ end: %s
+tagProjection:
+ tagFamilies:
+ - name: default
+ tags:
+ - id`, now.Format(RFC3339), end.Format(RFC3339))))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ }
+ Eventually(issue).ShouldNot(ContainSubstring("code:"))
+ Eventually(func() int {
+ out := issue()
+ GinkgoWriter.Println(out)
+ resp := new(measure_v1.QueryResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ GinkgoWriter.Println(resp)
+ return len(resp.DataPoints)
+ }).Should(Equal(6))
+ })
+
+ AfterEach(func() {
+ deferFunc()
+ })
+})
diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go
index f6e746a..7a791c9 100644
--- a/bydbctl/internal/cmd/rest.go
+++ b/bydbctl/internal/cmd/rest.go
@@ -21,21 +21,34 @@ import (
"encoding/json"
"fmt"
"io"
+ "time"
"github.com/go-resty/resty/v2"
"github.com/pkg/errors"
"github.com/spf13/viper"
+ str2duration "github.com/xhit/go-str2duration/v2"
+ "go.uber.org/multierr"
+ stpb "google.golang.org/genproto/googleapis/rpc/status"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"sigs.k8s.io/yaml"
"github.com/apache/skywalking-banyandb/bydbctl/pkg/file"
)
+const (
+ // RFC3339 refers to https://www.rfc-editor.org/rfc/rfc3339
+ RFC3339 = "2006-01-02T15:04:05Z07:00"
+ timeRange = 30 * time.Minute
+)
+
var errMalformedInput = errors.New("malformed input")
type reqBody struct {
- name string
- group string
- data []byte
+ name string
+ group string
+ parsedData map[string]interface{}
+ data []byte
}
type request struct {
@@ -86,10 +99,15 @@ func parseFromYAML(tryParseGroup bool, reader io.Reader)
(requests []reqBody, er
if !ok {
return nil, errors.WithMessage(errMalformedInput,
"absent node: name in metadata")
}
+ j, err = json.Marshal(data)
+ if err != nil {
+ return nil, err
+ }
requests = append(requests, reqBody{
- name: name,
- group: group,
- data: j,
+ name: name,
+ group: group,
+ data: j,
+ parsedData: data,
})
}
return requests, nil
@@ -111,6 +129,65 @@ func parseGroupFromFlags() ([]reqBody, error) {
return []reqBody{{group: group}}, nil
}
+func parseTimeRangeFromFlagAndYAML(reader io.Reader) (requests []reqBody, err
error) {
+ var startTS, endTS time.Time
+ if start == "" && end == "" {
+ startTS = time.Now().Add((-30) * time.Minute)
+ endTS = time.Now()
+ } else if start != "" && end != "" {
+ if startTS, err = parseTime(start); err != nil {
+ return nil, err
+ }
+ if endTS, err = parseTime(start); err != nil {
+ return nil, err
+ }
+ } else if start != "" {
+ if startTS, err = parseTime(start); err != nil {
+ return nil, err
+ }
+ endTS = startTS.Add(timeRange)
+ } else {
+ if endTS, err = parseTime(end); err != nil {
+ return nil, err
+ }
+ startTS = endTS.Add(-timeRange)
+ }
+ s := startTS.Format(RFC3339)
+ e := endTS.Format(RFC3339)
+ if requests, err = parseNameAndGroupFromYAML(reader); err != nil {
+ return nil, err
+ }
+ for _, rb := range requests {
+ if rb.parsedData["timeRange"] != nil {
+ continue
+ }
+ timeRange := make(map[string]interface{})
+ timeRange["start"] = s
+ timeRange["end"] = e
+ rb.parsedData["timeRange"] = timeRange
+ rb.data, err = json.Marshal(rb.parsedData)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return requests, nil
+}
+
+func parseTime(timestamp string) (time.Time, error) { // if timeStamp is
duration(relative time), return (true, the uint of timestamp, nil).
+ if len(timestamp) < 1 {
+ return time.Time{}, errors.New("time is empty")
+ }
+ t, errAbsoluteTime := time.Parse(timestamp, RFC3339)
+ if errAbsoluteTime == nil {
+ return t, nil
+ }
+ duration, err := str2duration.ParseDuration(timestamp)
+ if err != nil {
+ return time.Time{},
errors.WithMessagef(multierr.Combine(errAbsoluteTime, err), "time %s is neither
absolute time nor relative time", timestamp)
+ }
+ return time.Now().Add(duration), nil
+}
+
type printer func(index int, reqBody reqBody, body []byte) error
func yamlPrinter(index int, _ reqBody, body []byte) error {
@@ -146,7 +223,14 @@ func rest(pfn paramsFn, fn reqFn, printer printer) (err
error) {
if err != nil {
return err
}
- err = printer(i, r, resp.Body())
+ bd := resp.Body()
+ var st *stpb.Status
+ err = json.Unmarshal(bd, &st)
+ if err == nil && st.Code != int32(codes.OK) {
+ s := status.FromProto(st)
+ return s.Err()
+ }
+ err = printer(i, r, bd)
if err != nil {
return err
}
diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go
index d6669a1..285afa1 100644
--- a/bydbctl/internal/cmd/root.go
+++ b/bydbctl/internal/cmd/root.go
@@ -31,6 +31,8 @@ import (
var (
filePath string
name string
+ start string
+ end string
cfgFile string
rootCmd = &cobra.Command{
DisableAutoGenTag: true,
@@ -53,7 +55,7 @@ func RootCmdFlags(command *cobra.Command) {
_ = viper.BindPFlag("addr", command.PersistentFlags().Lookup("addr"))
viper.SetDefault("addr", "http://localhost:17913")
- command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd())
+ command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd(),
newMeasureCmd())
}
func init() {
@@ -108,3 +110,10 @@ func bindNameFlag(commands ...*cobra.Command) {
_ = c.MarkFlagRequired("name")
}
}
+
+func bindTimeRangeFlag(commands ...*cobra.Command) {
+ for _, c := range commands {
+ c.Flags().StringVarP(&start, "start", "s", "", "Start time of
the time range during which the query is preformed")
+ c.Flags().StringVarP(&end, "end", "e", "", "End time of the
time range during which the query is preformed")
+ }
+}
diff --git a/bydbctl/internal/cmd/stream.go b/bydbctl/internal/cmd/stream.go
index 719ec32..6db1a94 100644
--- a/bydbctl/internal/cmd/stream.go
+++ b/bydbctl/internal/cmd/stream.go
@@ -104,7 +104,6 @@ func newStreamCmd() *cobra.Command {
})
},
}
- bindFileFlag(createCmd, updateCmd)
getCmd := &cobra.Command{
Use: "get [-g group] -n name",
@@ -145,19 +144,19 @@ func newStreamCmd() *cobra.Command {
}
queryCmd := &cobra.Command{
- Use: "query",
+ Use: "query -f [file|dir|-]",
Version: version.Build(),
Short: "Query data in a stream",
- PersistentPreRunE: func(cmd *cobra.Command, args []string) (err
error) {
- return cmd.Parent().PersistentPreRunE(cmd.Parent(),
args)
- },
RunE: func(cmd *cobra.Command, _ []string) (err error) {
- return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+ return rest(func() ([]reqBody, error) { return
parseTimeRangeFromFlagAndYAML(cmd.InOrStdin()) },
func(request request) (*resty.Response, error) {
return
request.req.SetBody(request.data).Post(getPath("/api/v1/stream/data"))
}, yamlPrinter)
},
}
+ bindFileFlag(createCmd, updateCmd, queryCmd)
+ bindTimeRangeFlag(queryCmd)
+
streamCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
return streamCmd
}
diff --git a/bydbctl/internal/cmd/stream_test.go
b/bydbctl/internal/cmd/stream_test.go
index 966d99f..0d18c51 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -18,52 +18,42 @@
package cmd_test
import (
- "context"
+ "fmt"
"strings"
"time"
- "github.com/ghodss/yaml"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/spf13/cobra"
"github.com/zenizh/go-capturer"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
database_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/discovery"
- "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
- "github.com/apache/skywalking-banyandb/banyand/liaison/http"
- "github.com/apache/skywalking-banyandb/banyand/measure"
- "github.com/apache/skywalking-banyandb/banyand/metadata"
- "github.com/apache/skywalking-banyandb/banyand/query"
- "github.com/apache/skywalking-banyandb/banyand/queue"
- "github.com/apache/skywalking-banyandb/banyand/stream"
+ stream_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
- "github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ cases_stream_data
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
)
-var _ = Describe("Stream", func() {
- var path string
- var gracefulStop, deferFunc func()
- var listenClientURL, listenPeerURL string
+const (
+ RFC3339 = "2006-01-02T15:04:05Z07:00"
+)
+
+var _ = Describe("Stream Schema Operation", func() {
+ var addr string
+ var deferFunc func()
var rootCmd *cobra.Command
BeforeEach(func() {
- var err error
- path, deferFunc, err = test.NewSpace()
- Expect(err).NotTo(HaveOccurred())
- listenClientURL, listenPeerURL, err = test.NewEtcdListenUrls()
- Expect(err).NotTo(HaveOccurred())
- flags := []string{
- "--stream-root-path=" + path, "--measure-root-path=" +
path, "--metadata-root-path=" + path,
- "--etcd-listen-client-url=" + listenClientURL,
"--etcd-listen-peer-url=" + listenPeerURL,
- }
- gracefulStop = setup(false, flags)
- Eventually(helpers.HTTPHealthCheck("localhost:17913"),
10*time.Second).Should(Succeed())
- time.Sleep(1 * time.Second)
+ _, addr, deferFunc = setup.SetUp()
+ Eventually(helpers.HTTPHealthCheck(addr),
10*time.Second).Should(Succeed())
+ addr = "http://" + addr
// extracting the operation of creating stream schema
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
- rootCmd.SetArgs([]string{"group", "create", "-f", "-"})
+ rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f",
"-"})
rootCmd.SetIn(strings.NewReader(`
metadata:
name: group1
@@ -84,11 +74,16 @@ resource_opts:
Expect(err).NotTo(HaveOccurred())
})
Expect(out).To(ContainSubstring("group group1 is created"))
- rootCmd.SetArgs([]string{"stream", "create", "-f", "-"})
+ rootCmd.SetArgs([]string{"stream", "create", "-a", addr, "-f",
"-"})
rootCmd.SetIn(strings.NewReader(`
metadata:
name: name1
- group: group1`))
+ group: group1
+tagFamilies:
+ - name: searchable
+ tags:
+ - name: trace_id
+ type: TAG_TYPE_STRING`))
out = capturer.CaptureStdout(func() {
err := rootCmd.Execute()
Expect(err).NotTo(HaveOccurred())
@@ -102,8 +97,9 @@ metadata:
err := rootCmd.Execute()
Expect(err).NotTo(HaveOccurred())
})
+ GinkgoWriter.Println(out)
resp := new(database_v1.StreamRegistryServiceGetResponse)
- Expect(yaml.Unmarshal([]byte(out), resp)).To(Succeed())
+ helpers.UnmarshalYAML([]byte(out), resp)
Expect(resp.Stream.Metadata.Group).To(Equal("group1"))
Expect(resp.Stream.Metadata.Name).To(Equal("name1"))
})
@@ -143,11 +139,8 @@ entity:
Expect(out).To(ContainSubstring("stream group1.name1 is
deleted"))
// get again
rootCmd.SetArgs([]string{"stream", "get", "-g", "group1", "-n",
"name1"})
- out = capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("resource not found"))
+ err := rootCmd.Execute()
+ Expect(err).To(MatchError("rpc error: code = NotFound desc =
banyandb: resource not found"))
})
It("list stream schema", func() {
@@ -169,78 +162,117 @@ metadata:
Expect(err).NotTo(HaveOccurred())
})
resp := new(database_v1.StreamRegistryServiceListResponse)
- Expect(yaml.Unmarshal([]byte(out), resp)).To(Succeed())
+ helpers.UnmarshalYAML([]byte(out), resp)
Expect(resp.Stream).To(HaveLen(2))
})
AfterEach(func() {
- gracefulStop()
deferFunc()
})
})
-func setup(loadMetadata bool, flags []string) func() {
- // Init `Discovery` module
- repo, err := discovery.NewServiceRepo(context.Background())
- Expect(err).NotTo(HaveOccurred())
- // Init `Queue` module
- pipeline, err := queue.NewQueue(context.TODO(), repo)
- Expect(err).NotTo(HaveOccurred())
- // Init `Metadata` module
- metaSvc, err := metadata.NewService(context.TODO())
- Expect(err).NotTo(HaveOccurred())
- // Init `Stream` module
- streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo,
pipeline)
- Expect(err).NotTo(HaveOccurred())
- // Init `Measure` module
- measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo,
pipeline)
- Expect(err).NotTo(HaveOccurred())
- // Init `Query` module
- q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc,
metaSvc, repo, pipeline)
- Expect(err).NotTo(HaveOccurred())
- tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+var _ = Describe("Stream Data Query", func() {
+ var addr, grpcAddr string
+ var deferFunc func()
+ var rootCmd *cobra.Command
+ var now time.Time
+ var nowStr, endStr string
+ var interval time.Duration
+ BeforeEach(func() {
+ now = timestamp.NowMilli()
+ nowStr = now.Format(RFC3339)
+ interval = 500 * time.Millisecond
+ endStr = now.Add(1 * time.Hour).Format(RFC3339)
+ grpcAddr, addr, deferFunc = setup.SetUp()
+ Eventually(helpers.HTTPHealthCheck(addr),
10*time.Second).Should(Succeed())
+ addr = "http://" + addr
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ })
- httpServer := http.NewService()
- if loadMetadata {
- return test.SetUpModules(
- flags,
- repo,
- pipeline,
- metaSvc,
- &preloadStreamService{metaSvc: metaSvc},
- &preloadMeasureService{metaSvc: metaSvc},
- streamSvc,
- measureSvc,
- q,
- tcp,
- httpServer,
+ It("query stream data", func() {
+ conn, err := grpclib.Dial(
+ grpcAddr,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()),
)
- }
- return test.SetUpModules(
- flags,
- repo,
- pipeline,
- metaSvc,
- streamSvc,
- measureSvc,
- q,
- tcp,
- httpServer,
- )
-}
-
-type preloadStreamService struct {
- metaSvc metadata.Service
-}
-
-type preloadMeasureService struct {
- metaSvc metadata.Service
-}
+ Expect(err).NotTo(HaveOccurred())
-func (p *preloadStreamService) Name() string {
- return "preload-stream"
-}
+ cases_stream_data.Write(conn, "data.json", now, interval)
+ rootCmd.SetArgs([]string{"stream", "query", "-a", addr, "-f",
"-"})
+ issue := func() string {
+ rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+metadata:
+ name: sw
+ group: default
+timeRange:
+ begin: %s
+ end: %s
+projection:
+ tagFamilies:
+ - name: searchable
+ tags:
+ - trace_id`, nowStr, endStr)))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ }
+ Eventually(issue).ShouldNot(ContainSubstring("code:"))
+ Eventually(func() int {
+ out := issue()
+ resp := new(stream_v1.QueryResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ GinkgoWriter.Println(resp)
+ return len(resp.Elements)
+ }).Should(Equal(5))
+ })
+ DescribeTable("query stream data with time range flags", func(timeArgs
...string) {
+ conn, err := grpclib.Dial(
+ grpcAddr,
+
grpclib.WithTransportCredentials(insecure.NewCredentials()),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ now := timestamp.NowMilli()
+ interval := 500 * time.Millisecond
+ cases_stream_data.Write(conn, "data.json", now, interval)
+ args := []string{"stream", "query", "-a", addr}
+ args = append(args, timeArgs...)
+ args = append(args, "-f", "-")
+ rootCmd.SetArgs(args)
+ issue := func() string {
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: sw
+ group: default
+projection:
+ tagFamilies:
+ - name: searchable
+ tags:
+ - trace_id`))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ }
+ Eventually(issue).ShouldNot(ContainSubstring("code:"))
+ Eventually(func() int {
+ out := issue()
+ resp := new(stream_v1.QueryResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ GinkgoWriter.Println(resp)
+ return len(resp.Elements)
+ }).Should(Equal(5))
+ },
+ Entry("relative start", "--start", "-30m"),
+ Entry("relative end", "--end", "0m"),
+ Entry("absolute start", "--start", nowStr),
+ Entry("absolute end", "--end", endStr),
+ Entry("default"),
+ Entry("all relative", "--start", "-30m", "--end", "0m"),
+ Entry("all absolute", "--start", nowStr, "--end", endStr),
+ )
-func (p *preloadMeasureService) Name() string {
- return "preload-measure"
-}
+ AfterEach(func() {
+ deferFunc()
+ })
+})
diff --git a/dist/LICENSE b/dist/LICENSE
index 6074210..e41d8fb 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -264,6 +264,7 @@ BSD-3-Clause licenses
github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3 BSD-3-Clause
github.com/pmezard/go-difflib v1.0.0 BSD-3-Clause
github.com/spf13/pflag v1.0.5 BSD-3-Clause
+ github.com/xhit/go-str2duration/v2 v2.0.0 BSD-3-Clause
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 BSD-3-Clause
golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f BSD-3-Clause
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 BSD-3-Clause
@@ -336,7 +337,6 @@ MIT and Apache-2.0 licenses
MIT and BSD-3-Clause licenses
========================================================================
- github.com/ghodss/yaml v1.0.0 MIT and BSD-3-Clause
sigs.k8s.io/yaml v1.2.0 MIT and BSD-3-Clause
========================================================================
diff --git a/dist/licenses/license-github.com-ghodss-yaml.txt
b/dist/licenses/license-github.com-xhit-go-str2duration-v2.txt
similarity index 55%
rename from dist/licenses/license-github.com-ghodss-yaml.txt
rename to dist/licenses/license-github.com-xhit-go-str2duration-v2.txt
index 7805d36..ea5ea89 100644
--- a/dist/licenses/license-github.com-ghodss-yaml.txt
+++ b/dist/licenses/license-github.com-xhit-go-str2duration-v2.txt
@@ -1,27 +1,4 @@
-The MIT License (MIT)
-
-Copyright (c) 2014 Sam Ghods
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
-
-
-Copyright (c) 2012 The Go Authors. All rights reserved.
+Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
@@ -47,4 +24,4 @@ LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 2abaffb..173cd7c 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,6 @@ require (
github.com/dgraph-io/badger/v3 v3.2011.1
github.com/dgraph-io/ristretto v0.1.0
github.com/envoyproxy/protoc-gen-validate v0.1.0
- github.com/ghodss/yaml v1.0.0
github.com/go-chi/chi/v5 v5.0.7
github.com/go-resty/resty/v2 v2.7.0
github.com/golang/mock v1.6.0
@@ -30,6 +29,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.7.1
+ github.com/xhit/go-str2duration/v2 v2.0.0
github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
diff --git a/go.sum b/go.sum
index 044c43c..df29d43 100644
--- a/go.sum
+++ b/go.sum
@@ -172,7 +172,6 @@ github.com/fsnotify/fsnotify v1.5.4
h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwV
github.com/fsnotify/fsnotify v1.5.4/go.mod
h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/getsentry/raven-go v0.2.0
h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod
h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
-github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8=
github.com/go-chi/chi/v5 v5.0.7/go.mod
h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
@@ -496,6 +495,8 @@ github.com/tmc/grpc-websocket-proxy
v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802
h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod
h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod
h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
+github.com/xhit/go-str2duration/v2 v2.0.0
h1:uFtk6FWB375bP7ewQl+/1wBcn840GPhnySOdcz/okPE=
+github.com/xhit/go-str2duration/v2 v2.0.0/go.mod
h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod
h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod
h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
diff --git a/pkg/test/helpers/http_health.go b/pkg/test/helpers/http_health.go
index ec6aa2f..47fa01f 100644
--- a/pkg/test/helpers/http_health.go
+++ b/pkg/test/helpers/http_health.go
@@ -18,6 +18,7 @@ package helpers
import (
"fmt"
+ "time"
"github.com/go-resty/resty/v2"
)
@@ -30,11 +31,13 @@ func HTTPHealthCheck(addr string) func() error {
SetHeader("Accept", "application/json").
Get(fmt.Sprintf("http://%s/api/healthz", addr))
if err != nil {
+ time.Sleep(1 * time.Second)
return err
}
if resp.StatusCode() != 200 {
l.Warn().Str("responded_status",
resp.Status()).Msg("service unhealthy")
+ time.Sleep(1 * time.Second)
return ErrServiceUnhealthy
}
l.Info().Stringer("response", resp).Msg("connected")
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 50997c3..ee5b011 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/http"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/query"
@@ -37,15 +38,18 @@ import (
const host = "127.0.0.1"
-func SetUp(flags ...string) (string, func()) {
+func SetUp(flags ...string) (string, string, func()) {
path, deferFn, err := test.NewSpace()
gomega.Expect(err).NotTo(gomega.HaveOccurred())
var ports []int
ports, err = test.AllocateFreePorts(4)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
addr := fmt.Sprintf("%s:%d", host, ports[0])
+ httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
ff := []string{
"--addr=" + addr,
+ "--http-addr=" + httpAddr,
+ "--grpc-addr=" + addr,
"--stream-root-path=" + path,
"--measure-root-path=" + path,
"--metadata-root-path=" + path,
@@ -55,7 +59,7 @@ func SetUp(flags ...string) (string, func()) {
ff = append(ff, flags...)
}
gracefulStop := modules(ff)
- return addr, func() {
+ return addr, httpAddr, func() {
gracefulStop()
deferFn()
}
@@ -81,6 +85,7 @@ func modules(flags []string) func() {
q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc,
metaSvc, repo, pipeline)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+ httpServer := http.NewService()
return test.SetUpModules(
flags,
@@ -93,6 +98,7 @@ func modules(flags []string) func() {
measureSvc,
q,
tcp,
+ httpServer,
)
}
diff --git a/test/cases/stream/stream.go b/test/cases/stream/data/data.go
similarity index 52%
copy from test/cases/stream/stream.go
copy to test/cases/stream/data/data.go
index 600862d..07b07f8 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/data/data.go
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-// Package stream_test contains integration test cases of the stream
-package stream_test
+// Package data contains integration test cases of the stream
+package data
import (
"context"
@@ -24,7 +24,6 @@ import (
"encoding/base64"
"encoding/json"
"io"
- "math"
"strconv"
"time"
@@ -37,28 +36,29 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"sigs.k8s.io/yaml"
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
- streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ common_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ model_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ stream_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
)
-// SharedContext is the parallel execution context
-var SharedContext helpers.SharedContext
-
//go:embed input/*.yaml
var inputFS embed.FS
//go:embed want/*.yaml
var wantFS embed.FS
-var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+// VerifyFn verify whether the query response matches the wanted result
+var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
- innerGm.Expect(err).NotTo(gm.HaveOccurred())
- query := &streamv1.QueryRequest{}
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ query := &stream_v1.QueryRequest{}
helpers.UnmarshalYAML(i, query)
- query.TimeRange = helpers.TimeRange(args, SharedContext)
- c := streamv1.NewStreamServiceClient(SharedContext.Connection)
+ query.TimeRange = helpers.TimeRange(args, sharedContext)
+ c := stream_v1.NewStreamServiceClient(sharedContext.Connection)
ctx := context.Background()
resp, err := c.Query(ctx, query)
if args.WantErr {
@@ -67,21 +67,21 @@ var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
}
return
}
- innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
+ gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
if args.WantEmpty {
- innerGm.Expect(resp.Elements).To(gm.BeEmpty())
+ gm.Expect(resp.Elements).To(gm.BeEmpty())
return
}
if args.Want == "" {
args.Want = args.Input
}
ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
- innerGm.Expect(err).NotTo(gm.HaveOccurred())
- want := &streamv1.QueryResponse{}
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ want := &stream_v1.QueryResponse{}
helpers.UnmarshalYAML(ww, want)
- innerGm.Expect(cmp.Equal(resp, want,
+ gm.Expect(cmp.Equal(resp, want,
protocmp.IgnoreUnknown(),
- protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
+ protocmp.IgnoreFields(&stream_v1.Element{}, "timestamp"),
protocmp.Transform())).
To(gm.BeTrue(), func() string {
j, err := protojson.Marshal(resp)
@@ -96,36 +96,7 @@ var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
})
}
-var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
- gm.Eventually(func(innerGm gm.Gomega) {
- verifyFn(innerGm, args)
- })
-},
- g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 *
time.Hour}),
- g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
- g.Entry("offset", helpers.Args{Input: "offset", Duration: 1 *
time.Hour}),
- g.Entry("nothing", helpers.Args{Input: "all", WantEmpty: true}),
- g.Entry("invalid time range", helpers.Args{
- Input: "all",
- Begin: timestamppb.New(time.Unix(0,
int64(math.MinInt64+time.Millisecond)).Truncate(time.Millisecond)),
- End: timestamppb.New(time.Unix(0,
math.MaxInt64).Truncate(time.Millisecond)),
- }),
- g.Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 *
time.Hour}),
- g.Entry("global index", helpers.Args{Input: "global_index", Duration: 1
* time.Hour}),
- g.Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag",
Duration: 1 * time.Hour}),
- g.Entry("get empty result by non-indexed tag", helpers.Args{Input:
"filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}),
- g.Entry("numeric local index: less", helpers.Args{Input: "less",
Duration: 1 * time.Hour}),
- g.Entry("numeric local index: less and eq", helpers.Args{Input:
"less_eq", Duration: 1 * time.Hour}),
- g.Entry("logical expression", helpers.Args{Input: "logical", Duration:
1 * time.Hour}),
- g.Entry("having", helpers.Args{Input: "having", Duration: 1 *
time.Hour}),
- g.Entry("full text searching", helpers.Args{Input: "search", Duration:
1 * time.Hour}),
- g.Entry("indexed only tags", helpers.Args{Input: "indexed_only",
Duration: 1 * time.Hour}),
-)
-
-//go:embed testdata/*.json
-var dataFS embed.FS
-
-func loadData(stream streamv1.StreamService_WriteClient, dataFile string,
baseTime time.Time, interval time.Duration) {
+func loadData(stream stream_v1.StreamService_WriteClient, dataFile string,
baseTime time.Time, interval time.Duration) {
var templates []interface{}
content, err := dataFS.ReadFile("testdata/" + dataFile)
gm.Expect(err).ShouldNot(gm.HaveOccurred())
@@ -134,16 +105,16 @@ func loadData(stream streamv1.StreamService_WriteClient,
dataFile string, baseTi
for i, template := range templates {
rawSearchTagFamily, errMarshal := json.Marshal(template)
gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
- searchTagFamily := &modelv1.TagFamilyForWrite{}
+ searchTagFamily := &model_v1.TagFamilyForWrite{}
gm.Expect(protojson.Unmarshal(rawSearchTagFamily,
searchTagFamily)).ShouldNot(gm.HaveOccurred())
- e := &streamv1.ElementValue{
+ e := &stream_v1.ElementValue{
ElementId: strconv.Itoa(i),
Timestamp: timestamppb.New(baseTime.Add(interval *
time.Duration(i))),
- TagFamilies: []*modelv1.TagFamilyForWrite{
+ TagFamilies: []*model_v1.TagFamilyForWrite{
{
- Tags: []*modelv1.TagValue{
+ Tags: []*model_v1.TagValue{
{
- Value:
&modelv1.TagValue_BinaryData{
+ Value:
&model_v1.TagValue_BinaryData{
BinaryData: bb,
},
},
@@ -152,8 +123,8 @@ func loadData(stream streamv1.StreamService_WriteClient,
dataFile string, baseTi
},
}
e.TagFamilies = append(e.TagFamilies, searchTagFamily)
- errInner := stream.Send(&streamv1.WriteRequest{
- Metadata: &commonv1.Metadata{
+ errInner := stream.Send(&stream_v1.WriteRequest{
+ Metadata: &common_v1.Metadata{
Name: "sw",
Group: "default",
},
@@ -165,7 +136,7 @@ func loadData(stream streamv1.StreamService_WriteClient,
dataFile string, baseTi
// Write data into the server
func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time,
interval time.Duration) {
- c := streamv1.NewStreamServiceClient(conn)
+ c := stream_v1.NewStreamServiceClient(conn)
ctx := context.Background()
writeClient, err := c.Write(ctx)
gm.Expect(err).NotTo(gm.HaveOccurred())
diff --git a/test/cases/stream/input/all.yaml
b/test/cases/stream/data/input/all.yaml
similarity index 100%
rename from test/cases/stream/input/all.yaml
rename to test/cases/stream/data/input/all.yaml
diff --git a/test/cases/stream/input/filter_tag.yaml
b/test/cases/stream/data/input/filter_tag.yaml
similarity index 100%
rename from test/cases/stream/input/filter_tag.yaml
rename to test/cases/stream/data/input/filter_tag.yaml
diff --git a/test/cases/stream/input/filter_tag_empty.yaml
b/test/cases/stream/data/input/filter_tag_empty.yaml
similarity index 100%
rename from test/cases/stream/input/filter_tag_empty.yaml
rename to test/cases/stream/data/input/filter_tag_empty.yaml
diff --git a/test/cases/stream/input/global_index.yaml
b/test/cases/stream/data/input/global_index.yaml
similarity index 100%
rename from test/cases/stream/input/global_index.yaml
rename to test/cases/stream/data/input/global_index.yaml
diff --git a/test/cases/stream/input/having.yaml
b/test/cases/stream/data/input/having.yaml
similarity index 100%
rename from test/cases/stream/input/having.yaml
rename to test/cases/stream/data/input/having.yaml
diff --git a/test/cases/stream/input/indexed_only.yaml
b/test/cases/stream/data/input/indexed_only.yaml
similarity index 100%
rename from test/cases/stream/input/indexed_only.yaml
rename to test/cases/stream/data/input/indexed_only.yaml
diff --git a/test/cases/stream/input/less.yaml
b/test/cases/stream/data/input/less.yaml
similarity index 100%
rename from test/cases/stream/input/less.yaml
rename to test/cases/stream/data/input/less.yaml
diff --git a/test/cases/stream/input/less_eq.yaml
b/test/cases/stream/data/input/less_eq.yaml
similarity index 100%
rename from test/cases/stream/input/less_eq.yaml
rename to test/cases/stream/data/input/less_eq.yaml
diff --git a/test/cases/stream/input/limit.yaml
b/test/cases/stream/data/input/limit.yaml
similarity index 100%
rename from test/cases/stream/input/limit.yaml
rename to test/cases/stream/data/input/limit.yaml
diff --git a/test/cases/stream/input/logical.yaml
b/test/cases/stream/data/input/logical.yaml
similarity index 100%
rename from test/cases/stream/input/logical.yaml
rename to test/cases/stream/data/input/logical.yaml
diff --git a/test/cases/stream/input/offset.yaml
b/test/cases/stream/data/input/offset.yaml
similarity index 100%
rename from test/cases/stream/input/offset.yaml
rename to test/cases/stream/data/input/offset.yaml
diff --git a/test/cases/stream/input/search.yaml
b/test/cases/stream/data/input/search.yaml
similarity index 100%
rename from test/cases/stream/input/search.yaml
rename to test/cases/stream/data/input/search.yaml
diff --git a/test/cases/stream/input/sort_desc.yaml
b/test/cases/stream/data/input/sort_desc.yaml
similarity index 100%
rename from test/cases/stream/input/sort_desc.yaml
rename to test/cases/stream/data/input/sort_desc.yaml
diff --git a/test/cases/stream/testdata/data.json
b/test/cases/stream/data/testdata/data.json
similarity index 100%
rename from test/cases/stream/testdata/data.json
rename to test/cases/stream/data/testdata/data.json
diff --git a/test/cases/stream/want/all.yaml
b/test/cases/stream/data/want/all.yaml
similarity index 100%
rename from test/cases/stream/want/all.yaml
rename to test/cases/stream/data/want/all.yaml
diff --git a/test/cases/stream/want/filter_tag.yaml
b/test/cases/stream/data/want/filter_tag.yaml
similarity index 100%
rename from test/cases/stream/want/filter_tag.yaml
rename to test/cases/stream/data/want/filter_tag.yaml
diff --git a/test/cases/stream/want/global_index.yaml
b/test/cases/stream/data/want/global_index.yaml
similarity index 100%
rename from test/cases/stream/want/global_index.yaml
rename to test/cases/stream/data/want/global_index.yaml
diff --git a/test/cases/stream/want/having.yaml
b/test/cases/stream/data/want/having.yaml
similarity index 100%
rename from test/cases/stream/want/having.yaml
rename to test/cases/stream/data/want/having.yaml
diff --git a/test/cases/stream/want/indexed_only.yaml
b/test/cases/stream/data/want/indexed_only.yaml
similarity index 100%
rename from test/cases/stream/want/indexed_only.yaml
rename to test/cases/stream/data/want/indexed_only.yaml
diff --git a/test/cases/stream/want/less.yaml
b/test/cases/stream/data/want/less.yaml
similarity index 100%
rename from test/cases/stream/want/less.yaml
rename to test/cases/stream/data/want/less.yaml
diff --git a/test/cases/stream/want/less_eq.yaml
b/test/cases/stream/data/want/less_eq.yaml
similarity index 100%
rename from test/cases/stream/want/less_eq.yaml
rename to test/cases/stream/data/want/less_eq.yaml
diff --git a/test/cases/stream/want/limit.yaml
b/test/cases/stream/data/want/limit.yaml
similarity index 100%
rename from test/cases/stream/want/limit.yaml
rename to test/cases/stream/data/want/limit.yaml
diff --git a/test/cases/stream/want/logical.yaml
b/test/cases/stream/data/want/logical.yaml
similarity index 100%
rename from test/cases/stream/want/logical.yaml
rename to test/cases/stream/data/want/logical.yaml
diff --git a/test/cases/stream/want/offset.yaml
b/test/cases/stream/data/want/offset.yaml
similarity index 100%
rename from test/cases/stream/want/offset.yaml
rename to test/cases/stream/data/want/offset.yaml
diff --git a/test/cases/stream/want/search.yaml
b/test/cases/stream/data/want/search.yaml
similarity index 100%
rename from test/cases/stream/want/search.yaml
rename to test/cases/stream/data/want/search.yaml
diff --git a/test/cases/stream/want/sort_desc.yaml
b/test/cases/stream/data/want/sort_desc.yaml
similarity index 100%
rename from test/cases/stream/want/sort_desc.yaml
rename to test/cases/stream/data/want/sort_desc.yaml
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index 600862d..88f7ab8 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -19,88 +19,25 @@
package stream_test
import (
- "context"
- "embed"
- "encoding/base64"
- "encoding/json"
- "io"
"math"
- "strconv"
"time"
- "github.com/google/go-cmp/cmp"
g "github.com/onsi/ginkgo/v2"
- gm "github.com/onsi/gomega"
- grpclib "google.golang.org/grpc"
- "google.golang.org/protobuf/encoding/protojson"
- "google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
- "sigs.k8s.io/yaml"
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
- streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ stream_test_data
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
)
-// SharedContext is the parallel execution context
-var SharedContext helpers.SharedContext
-
-//go:embed input/*.yaml
-var inputFS embed.FS
-
-//go:embed want/*.yaml
-var wantFS embed.FS
-
-var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
- i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
- innerGm.Expect(err).NotTo(gm.HaveOccurred())
- query := &streamv1.QueryRequest{}
- helpers.UnmarshalYAML(i, query)
- query.TimeRange = helpers.TimeRange(args, SharedContext)
- c := streamv1.NewStreamServiceClient(SharedContext.Connection)
- ctx := context.Background()
- resp, err := c.Query(ctx, query)
- if args.WantErr {
- if err == nil {
- g.Fail("expect error")
- }
- return
- }
- innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
- if args.WantEmpty {
- innerGm.Expect(resp.Elements).To(gm.BeEmpty())
- return
- }
- if args.Want == "" {
- args.Want = args.Input
+var (
+ // SharedContext is the parallel execution context
+ SharedContext helpers.SharedContext
+ verify = func(args helpers.Args) {
+ stream_test_data.VerifyFn(SharedContext, args)
}
- ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
- innerGm.Expect(err).NotTo(gm.HaveOccurred())
- want := &streamv1.QueryResponse{}
- helpers.UnmarshalYAML(ww, want)
- innerGm.Expect(cmp.Equal(resp, want,
- protocmp.IgnoreUnknown(),
- protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
- protocmp.Transform())).
- To(gm.BeTrue(), func() string {
- j, err := protojson.Marshal(resp)
- if err != nil {
- return err.Error()
- }
- y, err := yaml.JSONToYAML(j)
- if err != nil {
- return err.Error()
- }
- return string(y)
- })
-}
+)
-var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
- gm.Eventually(func(innerGm gm.Gomega) {
- verifyFn(innerGm, args)
- })
-},
+var _ = g.DescribeTable("Scanning Streams", verify,
g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 *
time.Hour}),
g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
g.Entry("offset", helpers.Args{Input: "offset", Duration: 1 *
time.Hour}),
@@ -121,58 +58,3 @@ var _ = g.DescribeTable("Scanning Streams", func(args
helpers.Args) {
g.Entry("full text searching", helpers.Args{Input: "search", Duration:
1 * time.Hour}),
g.Entry("indexed only tags", helpers.Args{Input: "indexed_only",
Duration: 1 * time.Hour}),
)
-
-//go:embed testdata/*.json
-var dataFS embed.FS
-
-func loadData(stream streamv1.StreamService_WriteClient, dataFile string,
baseTime time.Time, interval time.Duration) {
- var templates []interface{}
- content, err := dataFS.ReadFile("testdata/" + dataFile)
- gm.Expect(err).ShouldNot(gm.HaveOccurred())
- gm.Expect(json.Unmarshal(content,
&templates)).ShouldNot(gm.HaveOccurred())
- bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
- for i, template := range templates {
- rawSearchTagFamily, errMarshal := json.Marshal(template)
- gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
- searchTagFamily := &modelv1.TagFamilyForWrite{}
- gm.Expect(protojson.Unmarshal(rawSearchTagFamily,
searchTagFamily)).ShouldNot(gm.HaveOccurred())
- e := &streamv1.ElementValue{
- ElementId: strconv.Itoa(i),
- Timestamp: timestamppb.New(baseTime.Add(interval *
time.Duration(i))),
- TagFamilies: []*modelv1.TagFamilyForWrite{
- {
- Tags: []*modelv1.TagValue{
- {
- Value:
&modelv1.TagValue_BinaryData{
- BinaryData: bb,
- },
- },
- },
- },
- },
- }
- e.TagFamilies = append(e.TagFamilies, searchTagFamily)
- errInner := stream.Send(&streamv1.WriteRequest{
- Metadata: &commonv1.Metadata{
- Name: "sw",
- Group: "default",
- },
- Element: e,
- })
- gm.Expect(errInner).ShouldNot(gm.HaveOccurred())
- }
-}
-
-// Write data into the server
-func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time,
interval time.Duration) {
- c := streamv1.NewStreamServiceClient(conn)
- ctx := context.Background()
- writeClient, err := c.Write(ctx)
- gm.Expect(err).NotTo(gm.HaveOccurred())
- loadData(writeClient, dataFile, baseTime, interval)
- gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
- gm.Eventually(func() error {
- _, err := writeClient.Recv()
- return err
- }).Should(gm.Equal(io.EOF))
-}
diff --git a/test/integration/cold_query/query_suite_test.go
b/test/integration/cold_query/query_suite_test.go
index 3971560..9b35b6c 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -21,8 +21,8 @@ import (
"testing"
"time"
- g "github.com/onsi/ginkgo/v2"
- gm "github.com/onsi/gomega"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -30,14 +30,15 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
- casesMeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
- casesMeasureData
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
- casesStream "github.com/apache/skywalking-banyandb/test/cases/stream"
+ cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
+ cases_measure_data
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+ cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
+ cases_stream_data
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
)
func TestIntegrationColdQuery(t *testing.T) {
- gm.RegisterFailHandler(g.Fail)
- g.RunSpecs(t, "Integration Query Cold Data Suite")
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Integration Query Cold Data Suite")
}
var (
@@ -46,25 +47,25 @@ var (
deferFunc func()
)
-var _ = g.SynchronizedBeforeSuite(func() []byte {
- gm.Expect(logger.Init(logger.Logging{
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
- })).To(gm.Succeed())
+ })).To(Succeed())
var addr string
- addr, deferFunc = setup.SetUp()
+ addr, _, deferFunc = setup.SetUp()
conn, err := grpclib.Dial(
addr,
grpclib.WithTransportCredentials(insecure.NewCredentials()),
)
- gm.Expect(err).NotTo(gm.HaveOccurred())
+ Expect(err).NotTo(HaveOccurred())
now = timestamp.NowMilli().Add(-time.Hour * 24)
interval := 500 * time.Millisecond
- casesStream.Write(conn, "data.json", now, interval)
- casesMeasureData.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
- casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
- casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
- gm.Expect(conn.Close()).To(gm.Succeed())
+ cases_stream_data.Write(conn, "data.json", now, interval)
+ cases_measure_data.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
+ cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
+ cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
+ Expect(conn.Close()).To(Succeed())
return []byte(addr)
}, func(address []byte) {
var err error
@@ -73,20 +74,20 @@ var _ = g.SynchronizedBeforeSuite(func() []byte {
grpclib.WithTransportCredentials(insecure.NewCredentials()),
grpclib.WithBlock(),
)
- casesStream.SharedContext = helpers.SharedContext{
+ cases_stream.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}
- casesMeasure.SharedContext = helpers.SharedContext{
+ cases_measure.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}
- gm.Expect(err).NotTo(gm.HaveOccurred())
+ Expect(err).NotTo(HaveOccurred())
})
-var _ = g.SynchronizedAfterSuite(func() {
+var _ = SynchronizedAfterSuite(func() {
if connection != nil {
- gm.Expect(connection.Close()).To(gm.Succeed())
+ Expect(connection.Close()).To(Succeed())
}
}, func() {
deferFunc()
diff --git a/test/integration/other/measure_test.go
b/test/integration/other/measure_test.go
index f162e64..d1c0e55 100644
--- a/test/integration/other/measure_test.go
+++ b/test/integration/other/measure_test.go
@@ -39,7 +39,7 @@ var _ = g.Describe("Query service_cpm_minute", func() {
g.BeforeEach(func() {
var addr string
- addr, deferFn = setup.SetUp()
+ addr, _, deferFn = setup.SetUp()
var err error
conn, err = grpclib.Dial(
addr,
diff --git a/test/integration/other/property_test.go
b/test/integration/other/property_test.go
index 37282b7..3a1d5ff 100644
--- a/test/integration/other/property_test.go
+++ b/test/integration/other/property_test.go
@@ -38,7 +38,7 @@ var _ = Describe("Property application", func() {
BeforeEach(func() {
var addr string
- addr, deferFn = setup.SetUp()
+ addr, _, deferFn = setup.SetUp()
var err error
conn, err = grpclib.Dial(
addr,
@@ -101,7 +101,7 @@ var _ = Describe("Property application", func() {
BeforeEach(func() {
var addr string
- addr, deferFn = setup.SetUp()
+ addr, _, deferFn = setup.SetUp()
var err error
conn, err = grpclib.Dial(
addr,
diff --git a/test/integration/other/tls_test.go
b/test/integration/other/tls_test.go
index 97fc855..f363ae5 100644
--- a/test/integration/other/tls_test.go
+++ b/test/integration/other/tls_test.go
@@ -45,7 +45,7 @@ var _ = g.Describe("Query service_cpm_minute", func() {
certFile := filepath.Join(basePath, "testdata/server_cert.pem")
keyFile := filepath.Join(basePath, "testdata/server_key.pem")
var addr string
- addr, deferFn = setup.SetUp("--tls=true",
"--cert-file="+certFile, "--key-file="+keyFile)
+ addr, _, deferFn = setup.SetUp("--tls=true",
"--cert-file="+certFile, "--key-file="+keyFile)
var err error
creds, err := credentials.NewClientTLSFromFile(certFile,
"localhost")
gm.Expect(err).NotTo(gm.HaveOccurred())
diff --git a/test/integration/query/query_suite_test.go
b/test/integration/query/query_suite_test.go
index 3579482..b1a3092 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -21,8 +21,8 @@ import (
"testing"
"time"
- g "github.com/onsi/ginkgo/v2"
- gm "github.com/onsi/gomega"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -33,11 +33,12 @@ import (
cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
cases_measure_data
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
+ cases_stream_data
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
)
func TestIntegrationQuery(t *testing.T) {
- gm.RegisterFailHandler(g.Fail)
- g.RunSpecs(t, "Integration Query Suite")
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Integration Query Suite")
}
var (
@@ -46,25 +47,25 @@ var (
deferFunc func()
)
-var _ = g.SynchronizedBeforeSuite(func() []byte {
- gm.Expect(logger.Init(logger.Logging{
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
- })).To(gm.Succeed())
+ })).To(Succeed())
var addr string
- addr, deferFunc = setup.SetUp()
+ addr, _, deferFunc = setup.SetUp()
conn, err := grpclib.Dial(
addr,
grpclib.WithTransportCredentials(insecure.NewCredentials()),
)
- gm.Expect(err).NotTo(gm.HaveOccurred())
+ Expect(err).NotTo(HaveOccurred())
now = timestamp.NowMilli()
interval := 500 * time.Millisecond
- cases_stream.Write(conn, "data.json", now, interval)
+ cases_stream_data.Write(conn, "data.json", now, interval)
cases_measure_data.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
- gm.Expect(conn.Close()).To(gm.Succeed())
+ Expect(conn.Close()).To(Succeed())
return []byte(addr)
}, func(address []byte) {
var err error
@@ -81,12 +82,12 @@ var _ = g.SynchronizedBeforeSuite(func() []byte {
Connection: connection,
BaseTime: now,
}
- gm.Expect(err).NotTo(gm.HaveOccurred())
+ Expect(err).NotTo(HaveOccurred())
})
-var _ = g.SynchronizedAfterSuite(func() {
+var _ = SynchronizedAfterSuite(func() {
if connection != nil {
- gm.Expect(connection.Close()).To(gm.Succeed())
+ Expect(connection.Close()).To(Succeed())
}
}, func() {
deferFunc()