This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a8c335a Add stream schema cli (#165)
a8c335a is described below
commit a8c335a7af60d680a60f51ced675be30f9cf03bb
Author: sacloud <[email protected]>
AuthorDate: Tue Sep 20 09:37:24 2022 +0800
Add stream schema cli (#165)
* add stream(crud) and use_group commands
Co-authored-by: Gao Hongtao <[email protected]>
---
.../main.go => banyand/liaison/http/health.go | 24 ++-
banyand/liaison/http/server.go | 19 +-
banyand/measure/service.go | 2 +-
banyand/metadata/schema/group.go | 2 +-
banyand/metadata/schema/stream.go | 5 +
banyand/stream/service.go | 5 +-
bydbctl/cmd/bydbctl/main.go | 2 +-
bydbctl/internal/cmd/group.go | 86 ++++++++
bydbctl/internal/cmd/rest.go | 156 ++++++++++++++
bydbctl/internal/cmd/root.go | 85 +++++++-
bydbctl/internal/cmd/stream.go | 163 ++++++++++++++
bydbctl/internal/cmd/stream_test.go | 234 +++++++++++++++++++++
bydbctl/internal/cmd/{root_test.go => use.go} | 29 ++-
bydbctl/pkg/file/read.go | 57 +++++
dist/LICENSE | 5 +-
dist/licenses/license-github.com-ghodss-yaml.txt | 50 +++++
.../license-github.com-go-resty-resty-v2.txt | 21 ++
.../license-github.com-zenizh-go-capturer.txt | 21 ++
go.mod | 5 +-
go.sum | 9 +-
pkg/schema/metadata.go | 79 +++----
.../main.go => pkg/test/helpers/http_health.go | 30 ++-
22 files changed, 1005 insertions(+), 84 deletions(-)
diff --git a/bydbctl/cmd/bydbctl/main.go b/banyand/liaison/http/health.go
similarity index 54%
copy from bydbctl/cmd/bydbctl/main.go
copy to banyand/liaison/http/health.go
index ebb144e..8adf613 100644
--- a/bydbctl/cmd/bydbctl/main.go
+++ b/banyand/liaison/http/health.go
@@ -15,19 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-// Package main provides main entry for the command-line toolkit, i.e. bydbctl
-package main
+package http
import (
- "fmt"
- "os"
+ "context"
- "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/status"
)
-func main() {
- if err := cmd.NewRoot().Execute(); err != nil {
- _, _ = fmt.Fprintln(os.Stderr, err)
- os.Exit(1)
- }
+type healthCheckClient struct{}
+
+func (g *healthCheckClient) Check(ctx context.Context, r
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption)
(*grpc_health_v1.HealthCheckResponse, error) {
+ return &grpc_health_v1.HealthCheckResponse{Status:
grpc_health_v1.HealthCheckResponse_SERVING}, nil
+}
+
+func (g *healthCheckClient) Watch(ctx context.Context, r
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption)
(grpc_health_v1.Health_WatchClient, error) {
+ return nil, status.Error(codes.Unimplemented, "unimplemented")
}
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index f1737b7..b078211 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -60,6 +60,8 @@ type service struct {
stopCh chan struct{}
clientCloser context.CancelFunc
l *logger.Logger
+
+ srv *stdhttp.Server
}
func (p *service) FlagSet() *run.FlagSet {
@@ -90,7 +92,7 @@ func (p *service) PreRun() error {
serveIndex := serveFileContents("index.html", httpFS)
p.mux.Mount("/", intercept404(fileServer, serveIndex))
- gwMux := runtime.NewServeMux()
+ gwMux :=
runtime.NewServeMux(runtime.WithHealthzEndpoint(&healthCheckClient{}))
var ctx context.Context
ctx, p.clientCloser = context.WithCancel(context.Background())
@@ -116,22 +118,29 @@ func (p *service) PreRun() error {
return err
}
p.mux.Mount("/api", http.StripPrefix("/api", gwMux))
+ p.srv = &stdhttp.Server{
+ Addr: p.listenAddr,
+ Handler: p.mux,
+ }
return nil
}
func (p *service) Serve() run.StopNotify {
go func() {
p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start liaison
http server")
- _ = stdhttp.ListenAndServe(p.listenAddr, p.mux)
- p.stopCh <- struct{}{}
+ if err := p.srv.ListenAndServe(); err != http.ErrServerClosed {
+ p.l.Error().Err(err)
+ }
+ close(p.stopCh)
}()
-
return p.stopCh
}
func (p *service) GracefulStop() {
+ if err := p.srv.Close(); err != nil {
+ p.l.Error().Err(err)
+ }
p.clientCloser()
- close(p.stopCh)
}
func intercept404(handler, on404 stdhttp.Handler) stdhttp.HandlerFunc {
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 26219d2..c43a26c 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -142,7 +142,6 @@ func (s *service) Serve() run.StopNotify {
s.metadata.MeasureRegistry().RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule,
&s.schemaRepo)
- s.stopCh = make(chan struct{})
return s.stopCh
}
@@ -159,5 +158,6 @@ func NewService(_ context.Context, metadata metadata.Repo,
repo discovery.Servic
metadata: metadata,
repo: repo,
pipeline: pipeline,
+ stopCh: make(chan struct{}),
}, nil
}
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index e1b0722..b62742b 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -37,7 +37,7 @@ func (e *etcdSchemaRegistry) GetGroup(ctx context.Context,
group string) (*commo
var entity commonv1.Group
err := e.get(ctx, formatGroupKey(group), &entity)
if err != nil {
- return nil, err
+ return nil, errors.WithMessagef(err, "GetGroup[%s]", group)
}
return &entity, nil
}
diff --git a/banyand/metadata/schema/stream.go
b/banyand/metadata/schema/stream.go
index accc6d8..433f35e 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -65,6 +65,11 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx
context.Context, stream *databasev
}
func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream
*databasev1.Stream) error {
+ group := stream.Metadata.GetGroup()
+ _, err := e.GetGroup(ctx, group)
+ if err != nil {
+ return err
+ }
return e.create(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindStream,
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index 2a7f555..184e8d3 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -133,12 +133,10 @@ func (s *service) PreRun() error {
func (s *service) Serve() run.StopNotify {
_ = s.schemaRepo.NotifyAll()
// run a serial watcher
- go s.schemaRepo.Watcher()
+ s.schemaRepo.Watcher()
s.metadata.StreamRegistry().RegisterHandler(schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule,
&s.schemaRepo)
-
- s.stopCh = make(chan struct{})
return s.stopCh
}
@@ -158,5 +156,6 @@ func NewService(_ context.Context, metadata metadata.Repo,
repo discovery.Servic
dbOpts: tsdb.DatabaseOpts{
EnableGlobalIndex: true,
},
+ stopCh: make(chan struct{}),
}, nil
}
diff --git a/bydbctl/cmd/bydbctl/main.go b/bydbctl/cmd/bydbctl/main.go
index ebb144e..6cbbabc 100644
--- a/bydbctl/cmd/bydbctl/main.go
+++ b/bydbctl/cmd/bydbctl/main.go
@@ -26,7 +26,7 @@ import (
)
func main() {
- if err := cmd.NewRoot().Execute(); err != nil {
+ if err := cmd.Execute(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
diff --git a/bydbctl/internal/cmd/group.go b/bydbctl/internal/cmd/group.go
new file mode 100644
index 0000000..b47d8a3
--- /dev/null
+++ b/bydbctl/internal/cmd/group.go
@@ -0,0 +1,86 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/apache/skywalking-banyandb/pkg/version"
+ "github.com/go-resty/resty/v2"
+ "github.com/spf13/cobra"
+
+ common_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ database_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+func newGroupCmd() *cobra.Command {
+ groupCmd := &cobra.Command{
+ Use: "group",
+ Version: version.Build(),
+ Short: "Group operation",
+ }
+
+ createCmd := &cobra.Command{
+ Use: "create -f [file|dir|-]",
+ Version: version.Build(),
+ Short: "Create groups from files",
+ RunE: func(cmd *cobra.Command, args []string) (err error) {
+ return rest(func() ([]reqBody, error) { return
parseNameFromYAML(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ g := new(common_v1.Group)
+ err := json.Unmarshal(request.data, g)
+ if err != nil {
+ return nil, err
+ }
+ cr :=
&database_v1.GroupRegistryServiceCreateRequest{
+ Group: g,
+ }
+ b, err := json.Marshal(cr)
+ if err != nil {
+ return nil, err
+ }
+ return
request.req.SetBody(b).Post(getPath("/api/v1/group/schema"))
+ },
+ func(_ int, reqBody reqBody, _ []byte) error {
+ fmt.Printf("group %s is created",
reqBody.name)
+ fmt.Println()
+ return nil
+ })
+ },
+ }
+ bindFileFlag(createCmd)
+
+ listCmd := &cobra.Command{
+ Use: "list",
+ Version: version.Build(),
+ Short: "list all groups",
+ PersistentPreRunE: func(cmd *cobra.Command, args []string) (err
error) {
+ return cmd.Parent().PersistentPreRunE(cmd.Parent(),
args)
+ },
+ RunE: func(cmd *cobra.Command, args []string) (err error) {
+ return rest(nil, func(request request)
(*resty.Response, error) {
+ return
request.req.Get(getPath("/api/v1/group/schema/lists"))
+ }, yamlPrinter)
+ },
+ }
+
+ // todo:GroupGetCmd, GroupUpdateCmd, GroupDeleteCmd
+ groupCmd.AddCommand(createCmd, listCmd)
+ return groupCmd
+}
diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go
new file mode 100644
index 0000000..f6e746a
--- /dev/null
+++ b/bydbctl/internal/cmd/rest.go
@@ -0,0 +1,156 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+
+ "github.com/go-resty/resty/v2"
+ "github.com/pkg/errors"
+ "github.com/spf13/viper"
+ "sigs.k8s.io/yaml"
+
+ "github.com/apache/skywalking-banyandb/bydbctl/pkg/file"
+)
+
+var errMalformedInput = errors.New("malformed input")
+
+type reqBody struct {
+ name string
+ group string
+ data []byte
+}
+
+type request struct {
+ req *resty.Request
+ reqBody
+}
+
+type reqFn func(request request) (*resty.Response, error)
+
+type paramsFn func() ([]reqBody, error)
+
+func parseNameAndGroupFromYAML(reader io.Reader) (requests []reqBody, err
error) {
+ return parseFromYAML(true, reader)
+}
+
+func parseNameFromYAML(reader io.Reader) (requests []reqBody, err error) {
+ return parseFromYAML(false, reader)
+}
+
+func parseFromYAML(tryParseGroup bool, reader io.Reader) (requests []reqBody,
err error) {
+ contents, err := file.Read(filePath, reader)
+ if err != nil {
+ return nil, err
+ }
+ for _, c := range contents {
+ j, err := yaml.YAMLToJSON(c)
+ if err != nil {
+ return nil, err
+ }
+ var data map[string]interface{}
+ err = json.Unmarshal(j, &data)
+ if err != nil {
+ return nil, err
+ }
+ metadata, ok := data["metadata"].(map[string]interface{})
+ if !ok {
+ return nil, errors.WithMessage(errMalformedInput,
"absent node: metadata")
+ }
+ group, ok := metadata["group"].(string)
+ if !ok && tryParseGroup {
+ group = viper.GetString("group")
+ if group == "" {
+ return nil, errors.New("please specify a group
through the input json or the config file")
+ }
+ metadata["group"] = group
+ }
+ name, ok = metadata["name"].(string)
+ if !ok {
+ return nil, errors.WithMessage(errMalformedInput,
"absent node: name in metadata")
+ }
+ requests = append(requests, reqBody{
+ name: name,
+ group: group,
+ data: j,
+ })
+ }
+ return requests, nil
+}
+
+func parseFromFlags() (requests []reqBody, err error) {
+ if requests, err = parseGroupFromFlags(); err != nil {
+ return nil, err
+ }
+ requests[0].name = name
+ return requests, nil
+}
+
+func parseGroupFromFlags() ([]reqBody, error) {
+ group := viper.GetString("group")
+ if group == "" {
+ return nil, errors.New("please specify a group through the flag
or the config file")
+ }
+ return []reqBody{{group: group}}, nil
+}
+
+type printer func(index int, reqBody reqBody, body []byte) error
+
+func yamlPrinter(index int, _ reqBody, body []byte) error {
+ yamlResult, err := yaml.JSONToYAML(body)
+ if err != nil {
+ return err
+ }
+ if index > 0 {
+ fmt.Println("---")
+ }
+ fmt.Print(string(yamlResult))
+ fmt.Println()
+ return nil
+}
+
+func rest(pfn paramsFn, fn reqFn, printer printer) (err error) {
+ var requests []reqBody
+ if pfn == nil {
+ requests = []reqBody{{}}
+ } else {
+ requests, err = pfn()
+ if err != nil {
+ return err
+ }
+ }
+
+ for i, r := range requests {
+ req := resty.New().R()
+ resp, err := fn(request{
+ reqBody: r,
+ req: req,
+ })
+ if err != nil {
+ return err
+ }
+ err = printer(i, r, resp.Body())
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go
index 5f546e2..c7fd7e3 100644
--- a/bydbctl/internal/cmd/root.go
+++ b/bydbctl/internal/cmd/root.go
@@ -19,18 +19,91 @@
package cmd
import (
- "github.com/spf13/cobra"
+ "fmt"
+ "os"
"github.com/apache/skywalking-banyandb/pkg/version"
+ "github.com/spf13/cobra"
+ "github.com/spf13/viper"
)
-// NewRoot returns the root command
-func NewRoot() *cobra.Command {
- cmd := &cobra.Command{
+var (
+ filePath string
+ name string
+ cfgFile string
+ rootCmd = &cobra.Command{
DisableAutoGenTag: true,
Version: version.Build(),
Short: "bydbctl is the command line tool of
BanyanDB",
}
- // cmd.AddCommand(...)
- return cmd
+)
+
+// Execute executes the root command.
+func Execute() error {
+ return rootCmd.Execute()
+}
+
+// RootCmdFlags bind flags to a command.
+func RootCmdFlags(command *cobra.Command) {
+ command.PersistentFlags().StringVar(&cfgFile, "config", "", "config
file (default is $HOME/.bydbctl.yaml)")
+ command.PersistentFlags().StringP("group", "g", "", "If present, list
objects in this group.")
+ command.PersistentFlags().StringP("addr", "a", "", "Server's address,
the format is Schema://Domain:Port")
+ _ = viper.BindPFlag("group", command.PersistentFlags().Lookup("group"))
+ _ = viper.BindPFlag("addr", command.PersistentFlags().Lookup("addr"))
+ viper.SetDefault("addr", "http://localhost:17913")
+
+ command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd())
+}
+
+func init() {
+ cobra.OnInitialize(initConfig)
+ RootCmdFlags(rootCmd)
+}
+
+func initConfig() {
+ if cfgFile != "" {
+ // Use config file from the flag.
+ viper.SetConfigFile(cfgFile)
+ } else {
+ // Find home directory.
+ home, err := os.UserHomeDir()
+ cobra.CheckErr(err)
+
+ // Search config in home directory with name ".bydbctl"
(without extension).
+ viper.AddConfigPath(home)
+ viper.SetConfigType("yaml")
+ viper.SetConfigName(".bydbctl")
+ }
+
+ viper.AutomaticEnv()
+
+ readCfg := func() error {
+ if err := viper.ReadInConfig(); err != nil {
+ return err
+ }
+ // Dump this to stderr in case of mixing up response yaml
+ fmt.Fprintln(os.Stderr, "Using config file:",
viper.ConfigFileUsed())
+ return nil
+ }
+
+ if err := readCfg(); err != nil {
+ if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
+ cobra.CheckErr(err)
+ }
+ cobra.CheckErr(viper.SafeWriteConfig())
+ cobra.CheckErr(readCfg())
+ }
+}
+
+func bindFileFlag(commands ...*cobra.Command) {
+ for _, c := range commands {
+ c.Flags().StringVarP(&filePath, "file", "f", "", "That contains
the request to send")
+ }
+}
+
+func bindNameFlag(commands ...*cobra.Command) {
+ for _, c := range commands {
+ c.Flags().StringVarP(&name, "name", "n", "", "the name of the
resource")
+ _ = c.MarkFlagRequired("name")
+ }
}
diff --git a/bydbctl/internal/cmd/stream.go b/bydbctl/internal/cmd/stream.go
new file mode 100644
index 0000000..00316ad
--- /dev/null
+++ b/bydbctl/internal/cmd/stream.go
@@ -0,0 +1,163 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/apache/skywalking-banyandb/pkg/version"
+ "github.com/go-resty/resty/v2"
+ "github.com/spf13/cobra"
+ "github.com/spf13/viper"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ database_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+const streamSchemaPath = "/api/v1/stream/schema"
+
+var streamSchemaPathWithParams = streamSchemaPath + "/{group}/{name}"
+
+func getPath(path string) string {
+ return viper.GetString("addr") + path
+}
+
+func newStreamCmd() *cobra.Command {
+ streamCmd := &cobra.Command{
+ Use: "stream",
+ Version: version.Build(),
+ Short: "Stream operation",
+ }
+
+ createCmd := &cobra.Command{
+ Use: "create -f [file|dir|-]",
+ Version: version.Build(),
+ Short: "Create streams from files",
+ RunE: func(cmd *cobra.Command, _ []string) error {
+ return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ s := new(database_v1.Stream)
+ err :=
protojson.Unmarshal(request.data, s)
+ if err != nil {
+ return nil, err
+ }
+ cr :=
&database_v1.StreamRegistryServiceCreateRequest{
+ Stream: s,
+ }
+ b, err := json.Marshal(cr)
+ if err != nil {
+ return nil, err
+ }
+ return
request.req.SetBody(b).Post(getPath(streamSchemaPath))
+ },
+ func(_ int, reqBody reqBody, _ []byte) error {
+ fmt.Printf("stream %s.%s is created",
reqBody.group, reqBody.name)
+ fmt.Println()
+ return nil
+ })
+ },
+ }
+
+ updateCmd := &cobra.Command{
+ Use: "update -f [file|dir|-]",
+ Version: version.Build(),
+ Short: "Update streams from files",
+ RunE: func(cmd *cobra.Command, _ []string) (err error) {
+ return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ s := new(database_v1.Stream)
+ err :=
protojson.Unmarshal(request.data, s)
+ if err != nil {
+ return nil, err
+ }
+ cr :=
&database_v1.StreamRegistryServiceUpdateRequest{
+ Stream: s,
+ }
+ b, err := json.Marshal(cr)
+ if err != nil {
+ return nil, err
+ }
+ return request.req.SetBody(b).
+ SetPathParam("name",
request.name).SetPathParam("group", request.group).
+
Put(getPath(streamSchemaPathWithParams))
+ },
+ func(_ int, reqBody reqBody, _ []byte) error {
+ fmt.Printf("stream %s.%s is updated",
reqBody.group, reqBody.name)
+ fmt.Println()
+ return nil
+ })
+ },
+ }
+ bindFileFlag(createCmd, updateCmd)
+
+ getCmd := &cobra.Command{
+ Use: "get [-g group] -n name",
+ Version: version.Build(),
+ Short: "Get a stream",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Get(getPath(streamSchemaPathWithParams))
+ }, yamlPrinter)
+ },
+ }
+
+ deleteCmd := &cobra.Command{
+ Use: "delete [-g group] -n name",
+ Version: version.Build(),
+ Short: "Delete a stream",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("name",
request.name).SetPathParam("group",
request.group).Delete(getPath(streamSchemaPathWithParams))
+ }, func(_ int, reqBody reqBody, _ []byte) error {
+ fmt.Printf("stream %s.%s is deleted",
reqBody.group, reqBody.name)
+ fmt.Println()
+ return nil
+ })
+ },
+ }
+ bindNameFlag(getCmd, deleteCmd)
+
+ listCmd := &cobra.Command{
+ Use: "list [-g group]",
+ Version: version.Build(),
+ Short: "List streams",
+ RunE: func(_ *cobra.Command, _ []string) (err error) {
+ return rest(parseFromFlags, func(request request)
(*resty.Response, error) {
+ return request.req.SetPathParam("group",
request.group).Get(getPath("/api/v1/stream/schema/lists/{group}"))
+ }, yamlPrinter)
+ },
+ }
+
+ queryCmd := &cobra.Command{
+ Use: "query",
+ Version: version.Build(),
+ Short: "Query data in a stream",
+ PersistentPreRunE: func(cmd *cobra.Command, args []string) (err
error) {
+ return cmd.Parent().PersistentPreRunE(cmd.Parent(),
args)
+ },
+ RunE: func(cmd *cobra.Command, _ []string) (err error) {
+ return rest(func() ([]reqBody, error) { return
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+ func(request request) (*resty.Response, error) {
+ return
request.req.SetBody(request.data).Post(getPath("/api/v1/stream/data"))
+ }, yamlPrinter)
+ },
+ }
+ streamCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd,
queryCmd)
+ return streamCmd
+}
diff --git a/bydbctl/internal/cmd/stream_test.go
b/bydbctl/internal/cmd/stream_test.go
new file mode 100644
index 0000000..046abaa
--- /dev/null
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -0,0 +1,234 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+ "context"
+ "strings"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/http"
+ "github.com/apache/skywalking-banyandb/banyand/measure"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/query"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/ghodss/yaml"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/spf13/cobra"
+ "github.com/zenizh/go-capturer"
+
+ database_v1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var _ = Describe("Stream", func() {
+ var path string
+ var gracefulStop, deferFunc func()
+ var listenClientURL, listenPeerURL string
+ var rootCmd *cobra.Command
+ BeforeEach(func() {
+ var err error
+ path, deferFunc, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ listenClientURL, listenPeerURL, err = test.NewEtcdListenUrls()
+ Expect(err).NotTo(HaveOccurred())
+ flags := []string{
+ "--stream-root-path=" + path, "--measure-root-path=" +
path, "--metadata-root-path=" + path,
+ "--etcd-listen-client-url=" + listenClientURL,
"--etcd-listen-peer-url=" + listenPeerURL,
+ }
+ gracefulStop = setup(false, flags)
+ Eventually(helpers.HTTPHealthCheck("localhost:17913"),
10*time.Second).Should(Succeed())
+ time.Sleep(1 * time.Second)
+ // extracting the operation of creating stream schema
+ rootCmd = &cobra.Command{Use: "root"}
+ cmd.RootCmdFlags(rootCmd)
+ rootCmd.SetArgs([]string{"group", "create", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: group1`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("group group1 is created"))
+ rootCmd.SetArgs([]string{"stream", "create", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name1
+ group: group1`))
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("stream group1.name1 is
created"))
+ })
+
+ It("get stream schema", func() {
+ rootCmd.SetArgs([]string{"stream", "get", "-g", "group1", "-n",
"name1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.StreamRegistryServiceGetResponse)
+ Expect(yaml.Unmarshal([]byte(out), resp)).To(Succeed())
+ Expect(resp.Stream.Metadata.Group).To(Equal("group1"))
+ Expect(resp.Stream.Metadata.Name).To(Equal("name1"))
+ })
+
+ It("update stream schema", func() {
+ rootCmd.SetArgs([]string{"stream", "update", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name1
+ group: group1
+entity:
+ tagNames: ["tag1"]`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("stream group1.name1 is
updated"))
+ rootCmd.SetArgs([]string{"stream", "get", "-g", "group1", "-n",
"name1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.StreamRegistryServiceGetResponse)
+ helpers.UnmarshalYAML([]byte(out), resp)
+ Expect(resp.Stream.Metadata.Group).To(Equal("group1"))
+ Expect(resp.Stream.Metadata.Name).To(Equal("name1"))
+ Expect(resp.Stream.Entity.TagNames[0]).To(Equal("tag1"))
+ })
+
+ It("delete stream schema", func() {
+ // delete
+ rootCmd.SetArgs([]string{"stream", "delete", "-g", "group1",
"-n", "name1"})
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("stream group1.name1 is
deleted"))
+ // get again
+ rootCmd.SetArgs([]string{"stream", "get", "-g", "group1", "-n",
"name1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("resource not found"))
+ })
+
+ It("list stream schema", func() {
+ // create another stream schema for list operation
+ rootCmd.SetArgs([]string{"stream", "create", "-f", "-"})
+ rootCmd.SetIn(strings.NewReader(`
+metadata:
+ name: name2
+ group: group1`))
+ out := capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Expect(out).To(ContainSubstring("stream group1.name2 is
created"))
+ // list
+ rootCmd.SetArgs([]string{"stream", "list", "-g", "group1"})
+ out = capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ Expect(err).NotTo(HaveOccurred())
+ })
+ resp := new(database_v1.StreamRegistryServiceListResponse)
+ Expect(yaml.Unmarshal([]byte(out), resp)).To(Succeed())
+ Expect(resp.Stream).To(HaveLen(2))
+ })
+
+ AfterEach(func() {
+ gracefulStop()
+ deferFunc()
+ })
+})
+
+func setup(loadMetadata bool, flags []string) func() {
+ // Init `Discovery` module
+ repo, err := discovery.NewServiceRepo(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+ // Init `Queue` module
+ pipeline, err := queue.NewQueue(context.TODO(), repo)
+ Expect(err).NotTo(HaveOccurred())
+ // Init `Metadata` module
+ metaSvc, err := metadata.NewService(context.TODO())
+ Expect(err).NotTo(HaveOccurred())
+ // Init `Stream` module
+ streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo,
pipeline)
+ Expect(err).NotTo(HaveOccurred())
+ // Init `Measure` module
+ measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo,
pipeline)
+ Expect(err).NotTo(HaveOccurred())
+ // Init `Query` module
+ q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc,
metaSvc, repo, pipeline)
+ Expect(err).NotTo(HaveOccurred())
+ tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+
+ httpServer := http.NewService()
+ if loadMetadata {
+ return test.SetUpModules(
+ flags,
+ repo,
+ pipeline,
+ metaSvc,
+ &preloadStreamService{metaSvc: metaSvc},
+ &preloadMeasureService{metaSvc: metaSvc},
+ streamSvc,
+ measureSvc,
+ q,
+ tcp,
+ httpServer,
+ )
+ }
+ return test.SetUpModules(
+ flags,
+ repo,
+ pipeline,
+ metaSvc,
+ streamSvc,
+ measureSvc,
+ q,
+ tcp,
+ httpServer,
+ )
+}
+
+type preloadStreamService struct {
+ metaSvc metadata.Service
+}
+
+type preloadMeasureService struct {
+ metaSvc metadata.Service
+}
+
+func (p *preloadStreamService) Name() string {
+ return "preload-stream"
+}
+
+func (p *preloadMeasureService) Name() string {
+ return "preload-measure"
+}
diff --git a/bydbctl/internal/cmd/root_test.go b/bydbctl/internal/cmd/use.go
similarity index 60%
rename from bydbctl/internal/cmd/root_test.go
rename to bydbctl/internal/cmd/use.go
index 943ca0a..a4429bf 100644
--- a/bydbctl/internal/cmd/root_test.go
+++ b/bydbctl/internal/cmd/use.go
@@ -15,13 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-package cmd_test
+package cmd
import (
- . "github.com/onsi/ginkgo/v2"
+ "fmt"
+
+ "github.com/apache/skywalking-banyandb/pkg/version"
+ "github.com/spf13/cobra"
+ "github.com/spf13/viper"
)
-var _ = Describe("Root", func() {
- It("foo", func() {
- })
-})
+func newUserCmd() *cobra.Command {
+ return &cobra.Command{
+ Use: "use group",
+ Version: version.Build(),
+ Short: "Select a group",
+ Args: cobra.ExactArgs(1),
+ RunE: func(cmd *cobra.Command, args []string) (err error) {
+ viper.Set("group", args[0])
+ err = viper.WriteConfig()
+ if err != nil {
+ return err
+ }
+ fmt.Printf("Switched to [%s]", viper.GetString("group"))
+ return nil
+ },
+ }
+}
diff --git a/bydbctl/pkg/file/read.go b/bydbctl/pkg/file/read.go
new file mode 100644
index 0000000..f4708b1
--- /dev/null
+++ b/bydbctl/pkg/file/read.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file provides utils to handle files
+package file
+
+import (
+ "bufio"
+ "io"
+ "os"
+ "path/filepath"
+)
+
+// Read bytes from given file or stdin (in case that path is `-`).
+func Read(path string, reader io.Reader) (contents [][]byte, err error) {
+ var b []byte
+ if path == "-" {
+ b, err = io.ReadAll(bufio.NewReader(reader))
+ return append(contents, b), err
+ }
+ fileInfo, err := os.Stat(path)
+ if err != nil {
+ return nil, err
+ }
+ if !fileInfo.IsDir() {
+ b, err = os.ReadFile(path)
+ return append(contents, b), err
+ }
+ err = filepath.Walk(path, func(path string, info os.FileInfo, err
error) error {
+ if err != nil {
+ return err // prevent panic from failed accessing path
+ }
+ if filepath.Ext(path) == ".yml" || filepath.Ext(path) ==
".yaml" {
+ content, err := os.ReadFile(path)
+ if err != nil {
+ return err
+ }
+ contents = append(contents, content)
+ }
+ return nil
+ })
+ return contents, err
+}
diff --git a/dist/LICENSE b/dist/LICENSE
index 090386f..0143495 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -301,6 +301,7 @@ MIT licenses
github.com/dustin/go-humanize v1.0.0 MIT
github.com/form3tech-oss/jwt-go v3.2.3+incompatible MIT
github.com/go-chi/chi/v5 v5.0.7 MIT
+ github.com/go-resty/resty/v2 v2.7.0 MIT
github.com/json-iterator/go v1.1.12 MIT
github.com/mitchellh/mapstructure v1.5.0 MIT
github.com/onsi/ginkgo/v2 v2.1.4 MIT
@@ -316,8 +317,9 @@ MIT licenses
github.com/subosito/gotenv v1.3.0 MIT
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 MIT
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 MIT
+ github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 MIT
go.etcd.io/bbolt v1.3.6 MIT
- go.uber.org/atomic v1.7.0 MIT
+ go.uber.org/atomic v1.9.0 MIT
go.uber.org/multierr v1.8.0 MIT
go.uber.org/zap v1.17.0 MIT
gopkg.in/natefinch/lumberjack.v2 v2.0.0 MIT
@@ -333,6 +335,7 @@ MIT and Apache-2.0 licenses
MIT and BSD-3-Clause licenses
========================================================================
+ github.com/ghodss/yaml v1.0.0 MIT and BSD-3-Clause
sigs.k8s.io/yaml v1.2.0 MIT and BSD-3-Clause
========================================================================
diff --git a/dist/licenses/license-github.com-ghodss-yaml.txt
b/dist/licenses/license-github.com-ghodss-yaml.txt
new file mode 100644
index 0000000..7805d36
--- /dev/null
+++ b/dist/licenses/license-github.com-ghodss-yaml.txt
@@ -0,0 +1,50 @@
+The MIT License (MIT)
+
+Copyright (c) 2014 Sam Ghods
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+
+
+Copyright (c) 2012 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/dist/licenses/license-github.com-go-resty-resty-v2.txt
b/dist/licenses/license-github.com-go-resty-resty-v2.txt
new file mode 100644
index 0000000..27326a6
--- /dev/null
+++ b/dist/licenses/license-github.com-go-resty-resty-v2.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2015-2021 Jeevanandam M., https://myjeeva.com <[email protected]>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/dist/licenses/license-github.com-zenizh-go-capturer.txt
b/dist/licenses/license-github.com-zenizh-go-capturer.txt
new file mode 100644
index 0000000..549d46b
--- /dev/null
+++ b/dist/licenses/license-github.com-zenizh-go-capturer.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 kami
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/go.mod b/go.mod
index 1032999..367d4a1 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,9 @@ require (
github.com/dgraph-io/badger/v3 v3.2011.1
github.com/dgraph-io/ristretto v0.1.0
github.com/envoyproxy/protoc-gen-validate v0.1.0
+ github.com/ghodss/yaml v1.0.0
github.com/go-chi/chi/v5 v5.0.7
+ github.com/go-resty/resty/v2 v2.7.0
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.5.8
github.com/google/uuid v1.3.0
@@ -28,6 +30,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.7.1
+ github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
go.uber.org/multierr v1.8.0
@@ -112,7 +115,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
- go.uber.org/atomic v1.7.0 // indirect
+ go.uber.org/atomic v1.9.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
diff --git a/go.sum b/go.sum
index b62e59b..d4f717b 100644
--- a/go.sum
+++ b/go.sum
@@ -172,6 +172,7 @@ github.com/fsnotify/fsnotify v1.5.4
h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwV
github.com/fsnotify/fsnotify v1.5.4/go.mod
h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/getsentry/raven-go v0.2.0
h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod
h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
+github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8=
github.com/go-chi/chi/v5 v5.0.7/go.mod
h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
@@ -184,6 +185,8 @@ github.com/go-kit/log v0.1.0/go.mod
h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
github.com/go-logfmt/logfmt v0.3.0/go.mod
h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod
h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod
h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
+github.com/go-resty/resty/v2 v2.7.0
h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
+github.com/go-resty/resty/v2 v2.7.0/go.mod
h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-stack/stack v1.8.0/go.mod
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/godbus/dbus/v5 v5.0.4/go.mod
h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -500,6 +503,8 @@ github.com/yuin/goldmark v1.1.32/go.mod
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod
h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.0/go.mod
h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04
h1:qXafrlZL1WsJW5OokjraLLRURHiw0OzKHD/RNdspp4w=
+github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04/go.mod
h1:FiwNQxz6hGoNFBC4nIx+CxZhI3nne5RmIOlT/MXcSD4=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
@@ -548,8 +553,9 @@ go.opentelemetry.io/otel/trace v0.20.0/go.mod
h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16g
go.opentelemetry.io/proto/otlp v0.7.0
h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.4.0/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
-go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
+go.uber.org/atomic v1.9.0/go.mod
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod
h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
@@ -658,6 +664,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod
h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod
h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2
h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y=
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index eb7b8aa..90400ec 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -20,6 +20,7 @@ package schema
import (
"context"
"io"
+ "runtime"
"sync"
"sync/atomic"
"time"
@@ -141,45 +142,50 @@ func (sr *schemaRepo) SendMetadataEvent(event
MetadataEvent) {
}
func (sr *schemaRepo) Watcher() {
- defer func() {
- if err := recover(); err != nil {
- sr.l.Warn().Interface("err", err).Msg("watching the
events")
- }
- }()
- for {
- select {
- case evt, more := <-sr.eventCh:
- if !more {
- return
- }
- sr.l.Info().Interface("event", evt).Msg("received an
event")
- for i := 0; i < 10; i++ {
- var err error
- switch evt.Typ {
- case EventAddOrUpdate:
- switch evt.Kind {
- case EventKindGroup:
- _, err =
sr.StoreGroup(evt.Metadata)
- case EventKindResource:
- _, err =
sr.storeResource(evt.Metadata)
+ for i := 0; i < 10; i++ {
+ go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ sr.l.Warn().Interface("err",
err).Msg("watching the events")
+ }
+ }()
+ for {
+ select {
+ case evt, more := <-sr.eventCh:
+ if !more {
+ return
}
- case EventDelete:
- switch evt.Kind {
- case EventKindGroup:
- err =
sr.deleteGroup(evt.Metadata)
- case EventKindResource:
- err =
sr.deleteResource(evt.Metadata)
+ sr.l.Info().Interface("event",
evt).Msg("received an event")
+ for i := 0; i < 10; i++ {
+ var err error
+ switch evt.Typ {
+ case EventAddOrUpdate:
+ switch evt.Kind {
+ case EventKindGroup:
+ _, err =
sr.StoreGroup(evt.Metadata)
+ case EventKindResource:
+ _, err =
sr.storeResource(evt.Metadata)
+ }
+ case EventDelete:
+ switch evt.Kind {
+ case EventKindGroup:
+ err =
sr.deleteGroup(evt.Metadata)
+ case EventKindResource:
+ err =
sr.deleteResource(evt.Metadata)
+ }
+ }
+ if err == nil {
+ break
+ }
+ runtime.Gosched()
+ time.Sleep(time.Second)
+
sr.l.Err(err).Interface("event", evt).Int("round", i).Msg("fail to handle the
metadata event. retry...")
}
+ case <-sr.workerStopCh:
+ return
}
- if err == nil {
- break
- }
- time.Sleep(time.Second)
- sr.l.Err(err).Interface("event",
evt).Int("round", i).Msg("fail to handle the metadata event. retry...")
}
- case <-sr.workerStopCh:
- return
- }
+ }()
}
}
@@ -344,9 +350,6 @@ func (sr *schemaRepo) Close() {
sr.l.Warn().Interface("err", err).Msg("closing
resource")
}
}()
- if sr.eventCh != nil {
- close(sr.eventCh)
- }
if sr.workerStopCh != nil {
close(sr.workerStopCh)
}
diff --git a/bydbctl/cmd/bydbctl/main.go b/pkg/test/helpers/http_health.go
similarity index 58%
copy from bydbctl/cmd/bydbctl/main.go
copy to pkg/test/helpers/http_health.go
index ebb144e..ec6aa2f 100644
--- a/bydbctl/cmd/bydbctl/main.go
+++ b/pkg/test/helpers/http_health.go
@@ -6,7 +6,7 @@
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
-// http://www.apache.org/licenses/LICENSE-2.0
+// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
@@ -14,20 +14,30 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-// Package main provides main entry for the command-line toolkit, i.e. bydbctl
-package main
+package helpers
import (
"fmt"
- "os"
- "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+ "github.com/go-resty/resty/v2"
)
-func main() {
- if err := cmd.NewRoot().Execute(); err != nil {
- _, _ = fmt.Fprintln(os.Stderr, err)
- os.Exit(1)
+func HTTPHealthCheck(addr string) func() error {
+ return func() error {
+ client := resty.New()
+
+ resp, err := client.R().
+ SetHeader("Accept", "application/json").
+ Get(fmt.Sprintf("http://%s/api/healthz", addr))
+ if err != nil {
+ return err
+ }
+
+ if resp.StatusCode() != 200 {
+ l.Warn().Str("responded_status",
resp.Status()).Msg("service unhealthy")
+ return ErrServiceUnhealthy
+ }
+ l.Info().Stringer("response", resp).Msg("connected")
+ return nil
}
}