This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a8c335a  Add stream schema cli (#165)
a8c335a is described below

commit a8c335a7af60d680a60f51ced675be30f9cf03bb
Author: sacloud <[email protected]>
AuthorDate: Tue Sep 20 09:37:24 2022 +0800

    Add stream schema cli (#165)
    
    * add stream(crud) and use_group commands
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 .../main.go => banyand/liaison/http/health.go      |  24 ++-
 banyand/liaison/http/server.go                     |  19 +-
 banyand/measure/service.go                         |   2 +-
 banyand/metadata/schema/group.go                   |   2 +-
 banyand/metadata/schema/stream.go                  |   5 +
 banyand/stream/service.go                          |   5 +-
 bydbctl/cmd/bydbctl/main.go                        |   2 +-
 bydbctl/internal/cmd/group.go                      |  86 ++++++++
 bydbctl/internal/cmd/rest.go                       | 156 ++++++++++++++
 bydbctl/internal/cmd/root.go                       |  85 +++++++-
 bydbctl/internal/cmd/stream.go                     | 163 ++++++++++++++
 bydbctl/internal/cmd/stream_test.go                | 234 +++++++++++++++++++++
 bydbctl/internal/cmd/{root_test.go => use.go}      |  29 ++-
 bydbctl/pkg/file/read.go                           |  57 +++++
 dist/LICENSE                                       |   5 +-
 dist/licenses/license-github.com-ghodss-yaml.txt   |  50 +++++
 .../license-github.com-go-resty-resty-v2.txt       |  21 ++
 .../license-github.com-zenizh-go-capturer.txt      |  21 ++
 go.mod                                             |   5 +-
 go.sum                                             |   9 +-
 pkg/schema/metadata.go                             |  79 +++----
 .../main.go => pkg/test/helpers/http_health.go     |  30 ++-
 22 files changed, 1005 insertions(+), 84 deletions(-)

diff --git a/bydbctl/cmd/bydbctl/main.go b/banyand/liaison/http/health.go
similarity index 54%
copy from bydbctl/cmd/bydbctl/main.go
copy to banyand/liaison/http/health.go
index ebb144e..8adf613 100644
--- a/bydbctl/cmd/bydbctl/main.go
+++ b/banyand/liaison/http/health.go
@@ -15,19 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package main provides main entry for the command-line toolkit, i.e. bydbctl
-package main
+package http
 
 import (
-       "fmt"
-       "os"
+       "context"
 
-       "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
 )
 
-func main() {
-       if err := cmd.NewRoot().Execute(); err != nil {
-               _, _ = fmt.Fprintln(os.Stderr, err)
-               os.Exit(1)
-       }
+type healthCheckClient struct{}
+
+func (g *healthCheckClient) Check(ctx context.Context, r 
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) 
(*grpc_health_v1.HealthCheckResponse, error) {
+       return &grpc_health_v1.HealthCheckResponse{Status: 
grpc_health_v1.HealthCheckResponse_SERVING}, nil
+}
+
+func (g *healthCheckClient) Watch(ctx context.Context, r 
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) 
(grpc_health_v1.Health_WatchClient, error) {
+       return nil, status.Error(codes.Unimplemented, "unimplemented")
 }
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index f1737b7..b078211 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -60,6 +60,8 @@ type service struct {
        stopCh       chan struct{}
        clientCloser context.CancelFunc
        l            *logger.Logger
+
+       srv *stdhttp.Server
 }
 
 func (p *service) FlagSet() *run.FlagSet {
@@ -90,7 +92,7 @@ func (p *service) PreRun() error {
        serveIndex := serveFileContents("index.html", httpFS)
        p.mux.Mount("/", intercept404(fileServer, serveIndex))
 
-       gwMux := runtime.NewServeMux()
+       gwMux := 
runtime.NewServeMux(runtime.WithHealthzEndpoint(&healthCheckClient{}))
        var ctx context.Context
        ctx, p.clientCloser = context.WithCancel(context.Background())
 
@@ -116,22 +118,29 @@ func (p *service) PreRun() error {
                return err
        }
        p.mux.Mount("/api", http.StripPrefix("/api", gwMux))
+       p.srv = &stdhttp.Server{
+               Addr:    p.listenAddr,
+               Handler: p.mux,
+       }
        return nil
 }
 
 func (p *service) Serve() run.StopNotify {
        go func() {
                p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start liaison 
http server")
-               _ = stdhttp.ListenAndServe(p.listenAddr, p.mux)
-               p.stopCh <- struct{}{}
+               if err := p.srv.ListenAndServe(); err != http.ErrServerClosed {
+                       p.l.Error().Err(err)
+               }
+               close(p.stopCh)
        }()
-
        return p.stopCh
 }
 
 func (p *service) GracefulStop() {
+       if err := p.srv.Close(); err != nil {
+               p.l.Error().Err(err)
+       }
        p.clientCloser()
-       close(p.stopCh)
 }
 
 func intercept404(handler, on404 stdhttp.Handler) stdhttp.HandlerFunc {
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 26219d2..c43a26c 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -142,7 +142,6 @@ func (s *service) Serve() run.StopNotify {
        
s.metadata.MeasureRegistry().RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule,
                &s.schemaRepo)
 
-       s.stopCh = make(chan struct{})
        return s.stopCh
 }
 
@@ -159,5 +158,6 @@ func NewService(_ context.Context, metadata metadata.Repo, 
repo discovery.Servic
                metadata: metadata,
                repo:     repo,
                pipeline: pipeline,
+               stopCh:   make(chan struct{}),
        }, nil
 }
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index e1b0722..b62742b 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -37,7 +37,7 @@ func (e *etcdSchemaRegistry) GetGroup(ctx context.Context, 
group string) (*commo
        var entity commonv1.Group
        err := e.get(ctx, formatGroupKey(group), &entity)
        if err != nil {
-               return nil, err
+               return nil, errors.WithMessagef(err, "GetGroup[%s]", group)
        }
        return &entity, nil
 }
diff --git a/banyand/metadata/schema/stream.go 
b/banyand/metadata/schema/stream.go
index accc6d8..433f35e 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -65,6 +65,11 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx 
context.Context, stream *databasev
 }
 
 func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream 
*databasev1.Stream) error {
+       group := stream.Metadata.GetGroup()
+       _, err := e.GetGroup(ctx, group)
+       if err != nil {
+               return err
+       }
        return e.create(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind:  KindStream,
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index 2a7f555..184e8d3 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -133,12 +133,10 @@ func (s *service) PreRun() error {
 func (s *service) Serve() run.StopNotify {
        _ = s.schemaRepo.NotifyAll()
        // run a serial watcher
-       go s.schemaRepo.Watcher()
+       s.schemaRepo.Watcher()
 
        
s.metadata.StreamRegistry().RegisterHandler(schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule,
                &s.schemaRepo)
-
-       s.stopCh = make(chan struct{})
        return s.stopCh
 }
 
@@ -158,5 +156,6 @@ func NewService(_ context.Context, metadata metadata.Repo, 
repo discovery.Servic
                dbOpts: tsdb.DatabaseOpts{
                        EnableGlobalIndex: true,
                },
+               stopCh: make(chan struct{}),
        }, nil
 }
diff --git a/bydbctl/cmd/bydbctl/main.go b/bydbctl/cmd/bydbctl/main.go
index ebb144e..6cbbabc 100644
--- a/bydbctl/cmd/bydbctl/main.go
+++ b/bydbctl/cmd/bydbctl/main.go
@@ -26,7 +26,7 @@ import (
 )
 
 func main() {
-       if err := cmd.NewRoot().Execute(); err != nil {
+       if err := cmd.Execute(); err != nil {
                _, _ = fmt.Fprintln(os.Stderr, err)
                os.Exit(1)
        }
diff --git a/bydbctl/internal/cmd/group.go b/bydbctl/internal/cmd/group.go
new file mode 100644
index 0000000..b47d8a3
--- /dev/null
+++ b/bydbctl/internal/cmd/group.go
@@ -0,0 +1,86 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+       "encoding/json"
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/pkg/version"
+       "github.com/go-resty/resty/v2"
+       "github.com/spf13/cobra"
+
+       common_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       database_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+func newGroupCmd() *cobra.Command {
+       groupCmd := &cobra.Command{
+               Use:     "group",
+               Version: version.Build(),
+               Short:   "Group operation",
+       }
+
+       createCmd := &cobra.Command{
+               Use:     "create -f [file|dir|-]",
+               Version: version.Build(),
+               Short:   "Create groups from files",
+               RunE: func(cmd *cobra.Command, args []string) (err error) {
+                       return rest(func() ([]reqBody, error) { return 
parseNameFromYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       g := new(common_v1.Group)
+                                       err := json.Unmarshal(request.data, g)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       cr := 
&database_v1.GroupRegistryServiceCreateRequest{
+                                               Group: g,
+                                       }
+                                       b, err := json.Marshal(cr)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       return 
request.req.SetBody(b).Post(getPath("/api/v1/group/schema"))
+                               },
+                               func(_ int, reqBody reqBody, _ []byte) error {
+                                       fmt.Printf("group %s is created", 
reqBody.name)
+                                       fmt.Println()
+                                       return nil
+                               })
+               },
+       }
+       bindFileFlag(createCmd)
+
+       listCmd := &cobra.Command{
+               Use:     "list",
+               Version: version.Build(),
+               Short:   "list all groups",
+               PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
+                       return cmd.Parent().PersistentPreRunE(cmd.Parent(), 
args)
+               },
+               RunE: func(cmd *cobra.Command, args []string) (err error) {
+                       return rest(nil, func(request request) 
(*resty.Response, error) {
+                               return 
request.req.Get(getPath("/api/v1/group/schema/lists"))
+                       }, yamlPrinter)
+               },
+       }
+
+       // todo:GroupGetCmd, GroupUpdateCmd, GroupDeleteCmd
+       groupCmd.AddCommand(createCmd, listCmd)
+       return groupCmd
+}
diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go
new file mode 100644
index 0000000..f6e746a
--- /dev/null
+++ b/bydbctl/internal/cmd/rest.go
@@ -0,0 +1,156 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+       "encoding/json"
+       "fmt"
+       "io"
+
+       "github.com/go-resty/resty/v2"
+       "github.com/pkg/errors"
+       "github.com/spf13/viper"
+       "sigs.k8s.io/yaml"
+
+       "github.com/apache/skywalking-banyandb/bydbctl/pkg/file"
+)
+
+var errMalformedInput = errors.New("malformed input")
+
+type reqBody struct {
+       name  string
+       group string
+       data  []byte
+}
+
+type request struct {
+       req *resty.Request
+       reqBody
+}
+
+type reqFn func(request request) (*resty.Response, error)
+
+type paramsFn func() ([]reqBody, error)
+
+func parseNameAndGroupFromYAML(reader io.Reader) (requests []reqBody, err 
error) {
+       return parseFromYAML(true, reader)
+}
+
+func parseNameFromYAML(reader io.Reader) (requests []reqBody, err error) {
+       return parseFromYAML(false, reader)
+}
+
+func parseFromYAML(tryParseGroup bool, reader io.Reader) (requests []reqBody, 
err error) {
+       contents, err := file.Read(filePath, reader)
+       if err != nil {
+               return nil, err
+       }
+       for _, c := range contents {
+               j, err := yaml.YAMLToJSON(c)
+               if err != nil {
+                       return nil, err
+               }
+               var data map[string]interface{}
+               err = json.Unmarshal(j, &data)
+               if err != nil {
+                       return nil, err
+               }
+               metadata, ok := data["metadata"].(map[string]interface{})
+               if !ok {
+                       return nil, errors.WithMessage(errMalformedInput, 
"absent node: metadata")
+               }
+               group, ok := metadata["group"].(string)
+               if !ok && tryParseGroup {
+                       group = viper.GetString("group")
+                       if group == "" {
+                               return nil, errors.New("please specify a group 
through the input json or the config file")
+                       }
+                       metadata["group"] = group
+               }
+               name, ok = metadata["name"].(string)
+               if !ok {
+                       return nil, errors.WithMessage(errMalformedInput, 
"absent node: name in metadata")
+               }
+               requests = append(requests, reqBody{
+                       name:  name,
+                       group: group,
+                       data:  j,
+               })
+       }
+       return requests, nil
+}
+
+func parseFromFlags() (requests []reqBody, err error) {
+       if requests, err = parseGroupFromFlags(); err != nil {
+               return nil, err
+       }
+       requests[0].name = name
+       return requests, nil
+}
+
+func parseGroupFromFlags() ([]reqBody, error) {
+       group := viper.GetString("group")
+       if group == "" {
+               return nil, errors.New("please specify a group through the flag 
or the config file")
+       }
+       return []reqBody{{group: group}}, nil
+}
+
+type printer func(index int, reqBody reqBody, body []byte) error
+
+func yamlPrinter(index int, _ reqBody, body []byte) error {
+       yamlResult, err := yaml.JSONToYAML(body)
+       if err != nil {
+               return err
+       }
+       if index > 0 {
+               fmt.Println("---")
+       }
+       fmt.Print(string(yamlResult))
+       fmt.Println()
+       return nil
+}
+
+func rest(pfn paramsFn, fn reqFn, printer printer) (err error) {
+       var requests []reqBody
+       if pfn == nil {
+               requests = []reqBody{{}}
+       } else {
+               requests, err = pfn()
+               if err != nil {
+                       return err
+               }
+       }
+
+       for i, r := range requests {
+               req := resty.New().R()
+               resp, err := fn(request{
+                       reqBody: r,
+                       req:     req,
+               })
+               if err != nil {
+                       return err
+               }
+               err = printer(i, r, resp.Body())
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
diff --git a/bydbctl/internal/cmd/root.go b/bydbctl/internal/cmd/root.go
index 5f546e2..c7fd7e3 100644
--- a/bydbctl/internal/cmd/root.go
+++ b/bydbctl/internal/cmd/root.go
@@ -19,18 +19,91 @@
 package cmd
 
 import (
-       "github.com/spf13/cobra"
+       "fmt"
+       "os"
 
        "github.com/apache/skywalking-banyandb/pkg/version"
+       "github.com/spf13/cobra"
+       "github.com/spf13/viper"
 )
 
-// NewRoot returns the root command
-func NewRoot() *cobra.Command {
-       cmd := &cobra.Command{
+var (
+       filePath string
+       name     string
+       cfgFile  string
+       rootCmd  = &cobra.Command{
                DisableAutoGenTag: true,
                Version:           version.Build(),
                Short:             "bydbctl is the command line tool of 
BanyanDB",
        }
-       // cmd.AddCommand(...)
-       return cmd
+)
+
+// Execute executes the root command.
+func Execute() error {
+       return rootCmd.Execute()
+}
+
+// RootCmdFlags bind flags to a command.
+func RootCmdFlags(command *cobra.Command) {
+       command.PersistentFlags().StringVar(&cfgFile, "config", "", "config 
file (default is $HOME/.bydbctl.yaml)")
+       command.PersistentFlags().StringP("group", "g", "", "If present, list 
objects in this group.")
+       command.PersistentFlags().StringP("addr", "a", "", "Server's address, 
the format is Schema://Domain:Port")
+       _ = viper.BindPFlag("group", command.PersistentFlags().Lookup("group"))
+       _ = viper.BindPFlag("addr", command.PersistentFlags().Lookup("addr"))
+       viper.SetDefault("addr", "http://localhost:17913";)
+
+       command.AddCommand(newGroupCmd(), newUserCmd(), newStreamCmd())
+}
+
+func init() {
+       cobra.OnInitialize(initConfig)
+       RootCmdFlags(rootCmd)
+}
+
+func initConfig() {
+       if cfgFile != "" {
+               // Use config file from the flag.
+               viper.SetConfigFile(cfgFile)
+       } else {
+               // Find home directory.
+               home, err := os.UserHomeDir()
+               cobra.CheckErr(err)
+
+               // Search config in home directory with name ".bydbctl" 
(without extension).
+               viper.AddConfigPath(home)
+               viper.SetConfigType("yaml")
+               viper.SetConfigName(".bydbctl")
+       }
+
+       viper.AutomaticEnv()
+
+       readCfg := func() error {
+               if err := viper.ReadInConfig(); err != nil {
+                       return err
+               }
+               // Dump this to stderr in case of mixing up response yaml
+               fmt.Fprintln(os.Stderr, "Using config file:", 
viper.ConfigFileUsed())
+               return nil
+       }
+
+       if err := readCfg(); err != nil {
+               if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
+                       cobra.CheckErr(err)
+               }
+               cobra.CheckErr(viper.SafeWriteConfig())
+               cobra.CheckErr(readCfg())
+       }
+}
+
+func bindFileFlag(commands ...*cobra.Command) {
+       for _, c := range commands {
+               c.Flags().StringVarP(&filePath, "file", "f", "", "That contains 
the request to send")
+       }
+}
+
+func bindNameFlag(commands ...*cobra.Command) {
+       for _, c := range commands {
+               c.Flags().StringVarP(&name, "name", "n", "", "the name of the 
resource")
+               _ = c.MarkFlagRequired("name")
+       }
 }
diff --git a/bydbctl/internal/cmd/stream.go b/bydbctl/internal/cmd/stream.go
new file mode 100644
index 0000000..00316ad
--- /dev/null
+++ b/bydbctl/internal/cmd/stream.go
@@ -0,0 +1,163 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+       "encoding/json"
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/pkg/version"
+       "github.com/go-resty/resty/v2"
+       "github.com/spf13/cobra"
+       "github.com/spf13/viper"
+       "google.golang.org/protobuf/encoding/protojson"
+
+       database_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+const streamSchemaPath = "/api/v1/stream/schema"
+
+var streamSchemaPathWithParams = streamSchemaPath + "/{group}/{name}"
+
+func getPath(path string) string {
+       return viper.GetString("addr") + path
+}
+
+func newStreamCmd() *cobra.Command {
+       streamCmd := &cobra.Command{
+               Use:     "stream",
+               Version: version.Build(),
+               Short:   "Stream operation",
+       }
+
+       createCmd := &cobra.Command{
+               Use:     "create -f [file|dir|-]",
+               Version: version.Build(),
+               Short:   "Create streams from files",
+               RunE: func(cmd *cobra.Command, _ []string) error {
+                       return rest(func() ([]reqBody, error) { return 
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       s := new(database_v1.Stream)
+                                       err := 
protojson.Unmarshal(request.data, s)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       cr := 
&database_v1.StreamRegistryServiceCreateRequest{
+                                               Stream: s,
+                                       }
+                                       b, err := json.Marshal(cr)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       return 
request.req.SetBody(b).Post(getPath(streamSchemaPath))
+                               },
+                               func(_ int, reqBody reqBody, _ []byte) error {
+                                       fmt.Printf("stream %s.%s is created", 
reqBody.group, reqBody.name)
+                                       fmt.Println()
+                                       return nil
+                               })
+               },
+       }
+
+       updateCmd := &cobra.Command{
+               Use:     "update -f [file|dir|-]",
+               Version: version.Build(),
+               Short:   "Update streams from files",
+               RunE: func(cmd *cobra.Command, _ []string) (err error) {
+                       return rest(func() ([]reqBody, error) { return 
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       s := new(database_v1.Stream)
+                                       err := 
protojson.Unmarshal(request.data, s)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       cr := 
&database_v1.StreamRegistryServiceUpdateRequest{
+                                               Stream: s,
+                                       }
+                                       b, err := json.Marshal(cr)
+                                       if err != nil {
+                                               return nil, err
+                                       }
+                                       return request.req.SetBody(b).
+                                               SetPathParam("name", 
request.name).SetPathParam("group", request.group).
+                                               
Put(getPath(streamSchemaPathWithParams))
+                               },
+                               func(_ int, reqBody reqBody, _ []byte) error {
+                                       fmt.Printf("stream %s.%s is updated", 
reqBody.group, reqBody.name)
+                                       fmt.Println()
+                                       return nil
+                               })
+               },
+       }
+       bindFileFlag(createCmd, updateCmd)
+
+       getCmd := &cobra.Command{
+               Use:     "get [-g group] -n name",
+               Version: version.Build(),
+               Short:   "Get a stream",
+               RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
+                               return request.req.SetPathParam("name", 
request.name).SetPathParam("group", 
request.group).Get(getPath(streamSchemaPathWithParams))
+                       }, yamlPrinter)
+               },
+       }
+
+       deleteCmd := &cobra.Command{
+               Use:     "delete [-g group] -n name",
+               Version: version.Build(),
+               Short:   "Delete a stream",
+               RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
+                               return request.req.SetPathParam("name", 
request.name).SetPathParam("group", 
request.group).Delete(getPath(streamSchemaPathWithParams))
+                       }, func(_ int, reqBody reqBody, _ []byte) error {
+                               fmt.Printf("stream %s.%s is deleted", 
reqBody.group, reqBody.name)
+                               fmt.Println()
+                               return nil
+                       })
+               },
+       }
+       bindNameFlag(getCmd, deleteCmd)
+
+       listCmd := &cobra.Command{
+               Use:     "list [-g group]",
+               Version: version.Build(),
+               Short:   "List streams",
+               RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       return rest(parseFromFlags, func(request request) 
(*resty.Response, error) {
+                               return request.req.SetPathParam("group", 
request.group).Get(getPath("/api/v1/stream/schema/lists/{group}"))
+                       }, yamlPrinter)
+               },
+       }
+
+       queryCmd := &cobra.Command{
+               Use:     "query",
+               Version: version.Build(),
+               Short:   "Query data in a stream",
+               PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
+                       return cmd.Parent().PersistentPreRunE(cmd.Parent(), 
args)
+               },
+               RunE: func(cmd *cobra.Command, _ []string) (err error) {
+                       return rest(func() ([]reqBody, error) { return 
parseNameAndGroupFromYAML(cmd.InOrStdin()) },
+                               func(request request) (*resty.Response, error) {
+                                       return 
request.req.SetBody(request.data).Post(getPath("/api/v1/stream/data"))
+                               }, yamlPrinter)
+               },
+       }
+       streamCmd.AddCommand(getCmd, createCmd, deleteCmd, updateCmd, listCmd, 
queryCmd)
+       return streamCmd
+}
diff --git a/bydbctl/internal/cmd/stream_test.go 
b/bydbctl/internal/cmd/stream_test.go
new file mode 100644
index 0000000..046abaa
--- /dev/null
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -0,0 +1,234 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd_test
+
+import (
+       "context"
+       "strings"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/banyand/discovery"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+       "github.com/apache/skywalking-banyandb/banyand/liaison/http"
+       "github.com/apache/skywalking-banyandb/banyand/measure"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/query"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/ghodss/yaml"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/spf13/cobra"
+       "github.com/zenizh/go-capturer"
+
+       database_v1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var _ = Describe("Stream", func() {
+       var path string
+       var gracefulStop, deferFunc func()
+       var listenClientURL, listenPeerURL string
+       var rootCmd *cobra.Command
+       BeforeEach(func() {
+               var err error
+               path, deferFunc, err = test.NewSpace()
+               Expect(err).NotTo(HaveOccurred())
+               listenClientURL, listenPeerURL, err = test.NewEtcdListenUrls()
+               Expect(err).NotTo(HaveOccurred())
+               flags := []string{
+                       "--stream-root-path=" + path, "--measure-root-path=" + 
path, "--metadata-root-path=" + path,
+                       "--etcd-listen-client-url=" + listenClientURL, 
"--etcd-listen-peer-url=" + listenPeerURL,
+               }
+               gracefulStop = setup(false, flags)
+               Eventually(helpers.HTTPHealthCheck("localhost:17913"), 
10*time.Second).Should(Succeed())
+               time.Sleep(1 * time.Second)
+               // extracting the operation of creating stream schema
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+               rootCmd.SetArgs([]string{"group", "create", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: group1`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("group group1 is created"))
+               rootCmd.SetArgs([]string{"stream", "create", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name1
+  group: group1`))
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("stream group1.name1 is 
created"))
+       })
+
+       It("get stream schema", func() {
+               rootCmd.SetArgs([]string{"stream", "get", "-g", "group1", "-n", 
"name1"})
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(database_v1.StreamRegistryServiceGetResponse)
+               Expect(yaml.Unmarshal([]byte(out), resp)).To(Succeed())
+               Expect(resp.Stream.Metadata.Group).To(Equal("group1"))
+               Expect(resp.Stream.Metadata.Name).To(Equal("name1"))
+       })
+
+       It("update stream schema", func() {
+               rootCmd.SetArgs([]string{"stream", "update", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name1
+  group: group1
+entity:
+  tagNames: ["tag1"]`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("stream group1.name1 is 
updated"))
+               rootCmd.SetArgs([]string{"stream", "get", "-g", "group1", "-n", 
"name1"})
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(database_v1.StreamRegistryServiceGetResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               Expect(resp.Stream.Metadata.Group).To(Equal("group1"))
+               Expect(resp.Stream.Metadata.Name).To(Equal("name1"))
+               Expect(resp.Stream.Entity.TagNames[0]).To(Equal("tag1"))
+       })
+
+       It("delete stream schema", func() {
+               // delete
+               rootCmd.SetArgs([]string{"stream", "delete", "-g", "group1", 
"-n", "name1"})
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("stream group1.name1 is 
deleted"))
+               // get again
+               rootCmd.SetArgs([]string{"stream", "get", "-g", "group1", "-n", 
"name1"})
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("resource not found"))
+       })
+
+       It("list stream schema", func() {
+               // create another stream schema for list operation
+               rootCmd.SetArgs([]string{"stream", "create", "-f", "-"})
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: name2
+  group: group1`))
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("stream group1.name2 is 
created"))
+               // list
+               rootCmd.SetArgs([]string{"stream", "list", "-g", "group1"})
+               out = capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               resp := new(database_v1.StreamRegistryServiceListResponse)
+               Expect(yaml.Unmarshal([]byte(out), resp)).To(Succeed())
+               Expect(resp.Stream).To(HaveLen(2))
+       })
+
+       AfterEach(func() {
+               gracefulStop()
+               deferFunc()
+       })
+})
+
+func setup(loadMetadata bool, flags []string) func() {
+       // Init `Discovery` module
+       repo, err := discovery.NewServiceRepo(context.Background())
+       Expect(err).NotTo(HaveOccurred())
+       // Init `Queue` module
+       pipeline, err := queue.NewQueue(context.TODO(), repo)
+       Expect(err).NotTo(HaveOccurred())
+       // Init `Metadata` module
+       metaSvc, err := metadata.NewService(context.TODO())
+       Expect(err).NotTo(HaveOccurred())
+       // Init `Stream` module
+       streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, 
pipeline)
+       Expect(err).NotTo(HaveOccurred())
+       // Init `Measure` module
+       measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, 
pipeline)
+       Expect(err).NotTo(HaveOccurred())
+       // Init `Query` module
+       q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc, 
metaSvc, repo, pipeline)
+       Expect(err).NotTo(HaveOccurred())
+       tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+
+       httpServer := http.NewService()
+       if loadMetadata {
+               return test.SetUpModules(
+                       flags,
+                       repo,
+                       pipeline,
+                       metaSvc,
+                       &preloadStreamService{metaSvc: metaSvc},
+                       &preloadMeasureService{metaSvc: metaSvc},
+                       streamSvc,
+                       measureSvc,
+                       q,
+                       tcp,
+                       httpServer,
+               )
+       }
+       return test.SetUpModules(
+               flags,
+               repo,
+               pipeline,
+               metaSvc,
+               streamSvc,
+               measureSvc,
+               q,
+               tcp,
+               httpServer,
+       )
+}
+
+type preloadStreamService struct {
+       metaSvc metadata.Service
+}
+
+type preloadMeasureService struct {
+       metaSvc metadata.Service
+}
+
+func (p *preloadStreamService) Name() string {
+       return "preload-stream"
+}
+
+func (p *preloadMeasureService) Name() string {
+       return "preload-measure"
+}
diff --git a/bydbctl/internal/cmd/root_test.go b/bydbctl/internal/cmd/use.go
similarity index 60%
rename from bydbctl/internal/cmd/root_test.go
rename to bydbctl/internal/cmd/use.go
index 943ca0a..a4429bf 100644
--- a/bydbctl/internal/cmd/root_test.go
+++ b/bydbctl/internal/cmd/use.go
@@ -15,13 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package cmd_test
+package cmd
 
 import (
-       . "github.com/onsi/ginkgo/v2"
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/pkg/version"
+       "github.com/spf13/cobra"
+       "github.com/spf13/viper"
 )
 
-var _ = Describe("Root", func() {
-       It("foo", func() {
-       })
-})
+func newUserCmd() *cobra.Command {
+       return &cobra.Command{
+               Use:     "use group",
+               Version: version.Build(),
+               Short:   "Select a group",
+               Args:    cobra.ExactArgs(1),
+               RunE: func(cmd *cobra.Command, args []string) (err error) {
+                       viper.Set("group", args[0])
+                       err = viper.WriteConfig()
+                       if err != nil {
+                               return err
+                       }
+                       fmt.Printf("Switched to [%s]", viper.GetString("group"))
+                       return nil
+               },
+       }
+}
diff --git a/bydbctl/pkg/file/read.go b/bydbctl/pkg/file/read.go
new file mode 100644
index 0000000..f4708b1
--- /dev/null
+++ b/bydbctl/pkg/file/read.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package file provides utils to handle files
+package file
+
+import (
+       "bufio"
+       "io"
+       "os"
+       "path/filepath"
+)
+
+// Read bytes from given file or stdin (in case that path is `-`).
+func Read(path string, reader io.Reader) (contents [][]byte, err error) {
+       var b []byte
+       if path == "-" {
+               b, err = io.ReadAll(bufio.NewReader(reader))
+               return append(contents, b), err
+       }
+       fileInfo, err := os.Stat(path)
+       if err != nil {
+               return nil, err
+       }
+       if !fileInfo.IsDir() {
+               b, err = os.ReadFile(path)
+               return append(contents, b), err
+       }
+       err = filepath.Walk(path, func(path string, info os.FileInfo, err 
error) error {
+               if err != nil {
+                       return err // prevent panic from failed accessing path
+               }
+               if filepath.Ext(path) == ".yml" || filepath.Ext(path) == 
".yaml" {
+                       content, err := os.ReadFile(path)
+                       if err != nil {
+                               return err
+                       }
+                       contents = append(contents, content)
+               }
+               return nil
+       })
+       return contents, err
+}
diff --git a/dist/LICENSE b/dist/LICENSE
index 090386f..0143495 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -301,6 +301,7 @@ MIT licenses
     github.com/dustin/go-humanize v1.0.0 MIT
     github.com/form3tech-oss/jwt-go v3.2.3+incompatible MIT
     github.com/go-chi/chi/v5 v5.0.7 MIT
+    github.com/go-resty/resty/v2 v2.7.0 MIT
     github.com/json-iterator/go v1.1.12 MIT
     github.com/mitchellh/mapstructure v1.5.0 MIT
     github.com/onsi/ginkgo/v2 v2.1.4 MIT
@@ -316,8 +317,9 @@ MIT licenses
     github.com/subosito/gotenv v1.3.0 MIT
     github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 MIT
     github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 MIT
+    github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 MIT
     go.etcd.io/bbolt v1.3.6 MIT
-    go.uber.org/atomic v1.7.0 MIT
+    go.uber.org/atomic v1.9.0 MIT
     go.uber.org/multierr v1.8.0 MIT
     go.uber.org/zap v1.17.0 MIT
     gopkg.in/natefinch/lumberjack.v2 v2.0.0 MIT
@@ -333,6 +335,7 @@ MIT and Apache-2.0 licenses
 MIT and BSD-3-Clause licenses
 ========================================================================
 
+    github.com/ghodss/yaml v1.0.0 MIT and BSD-3-Clause
     sigs.k8s.io/yaml v1.2.0 MIT and BSD-3-Clause
 
 ========================================================================
diff --git a/dist/licenses/license-github.com-ghodss-yaml.txt 
b/dist/licenses/license-github.com-ghodss-yaml.txt
new file mode 100644
index 0000000..7805d36
--- /dev/null
+++ b/dist/licenses/license-github.com-ghodss-yaml.txt
@@ -0,0 +1,50 @@
+The MIT License (MIT)
+
+Copyright (c) 2014 Sam Ghods
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+
+
+Copyright (c) 2012 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+   * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+   * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+   * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/dist/licenses/license-github.com-go-resty-resty-v2.txt 
b/dist/licenses/license-github.com-go-resty-resty-v2.txt
new file mode 100644
index 0000000..27326a6
--- /dev/null
+++ b/dist/licenses/license-github.com-go-resty-resty-v2.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2015-2021 Jeevanandam M., https://myjeeva.com <[email protected]>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/dist/licenses/license-github.com-zenizh-go-capturer.txt 
b/dist/licenses/license-github.com-zenizh-go-capturer.txt
new file mode 100644
index 0000000..549d46b
--- /dev/null
+++ b/dist/licenses/license-github.com-zenizh-go-capturer.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2017 kami
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/go.mod b/go.mod
index 1032999..367d4a1 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,9 @@ require (
        github.com/dgraph-io/badger/v3 v3.2011.1
        github.com/dgraph-io/ristretto v0.1.0
        github.com/envoyproxy/protoc-gen-validate v0.1.0
+       github.com/ghodss/yaml v1.0.0
        github.com/go-chi/chi/v5 v5.0.7
+       github.com/go-resty/resty/v2 v2.7.0
        github.com/golang/mock v1.6.0
        github.com/google/go-cmp v0.5.8
        github.com/google/uuid v1.3.0
@@ -28,6 +30,7 @@ require (
        github.com/spf13/pflag v1.0.5
        github.com/spf13/viper v1.12.0
        github.com/stretchr/testify v1.7.1
+       github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04
        go.etcd.io/etcd/client/v3 v3.5.4
        go.etcd.io/etcd/server/v3 v3.5.4
        go.uber.org/multierr v1.8.0
@@ -112,7 +115,7 @@ require (
        go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
        go.opentelemetry.io/otel/trace v0.20.0 // indirect
        go.opentelemetry.io/proto/otlp v0.7.0 // indirect
-       go.uber.org/atomic v1.7.0 // indirect
+       go.uber.org/atomic v1.9.0 // indirect
        go.uber.org/zap v1.17.0 // indirect
        golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
        golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
diff --git a/go.sum b/go.sum
index b62e59b..d4f717b 100644
--- a/go.sum
+++ b/go.sum
@@ -172,6 +172,7 @@ github.com/fsnotify/fsnotify v1.5.4 
h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwV
 github.com/fsnotify/fsnotify v1.5.4/go.mod 
h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
 github.com/getsentry/raven-go v0.2.0 
h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
 github.com/getsentry/raven-go v0.2.0/go.mod 
h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
+github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
 github.com/ghodss/yaml v1.0.0/go.mod 
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8=
 github.com/go-chi/chi/v5 v5.0.7/go.mod 
h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
@@ -184,6 +185,8 @@ github.com/go-kit/log v0.1.0/go.mod 
h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
 github.com/go-logfmt/logfmt v0.3.0/go.mod 
h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
 github.com/go-logfmt/logfmt v0.4.0/go.mod 
h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
 github.com/go-logfmt/logfmt v0.5.0/go.mod 
h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
+github.com/go-resty/resty/v2 v2.7.0 
h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
+github.com/go-resty/resty/v2 v2.7.0/go.mod 
h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
 github.com/go-stack/stack v1.8.0/go.mod 
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/godbus/dbus/v5 v5.0.4/go.mod 
h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/gogo/protobuf v1.1.1/go.mod 
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -500,6 +503,8 @@ github.com/yuin/goldmark v1.1.32/go.mod 
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
 github.com/yuin/goldmark v1.2.1/go.mod 
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.3.5/go.mod 
h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
 github.com/yuin/goldmark v1.4.0/go.mod 
h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 
h1:qXafrlZL1WsJW5OokjraLLRURHiw0OzKHD/RNdspp4w=
+github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04/go.mod 
h1:FiwNQxz6hGoNFBC4nIx+CxZhI3nne5RmIOlT/MXcSD4=
 go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
 go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
@@ -548,8 +553,9 @@ go.opentelemetry.io/otel/trace v0.20.0/go.mod 
h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16g
 go.opentelemetry.io/proto/otlp v0.7.0 
h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
 go.opentelemetry.io/proto/otlp v0.7.0/go.mod 
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
 go.uber.org/atomic v1.4.0/go.mod 
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
-go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
+go.uber.org/atomic v1.9.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
 go.uber.org/goleak v1.1.10/go.mod 
h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
 go.uber.org/multierr v1.1.0/go.mod 
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
@@ -658,6 +664,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod 
h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod 
h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 
h1:NWy5+hlRbC7HK+PmcXVUmW1IMyFce7to56IUvhUFm7Y=
 golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index eb7b8aa..90400ec 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -20,6 +20,7 @@ package schema
 import (
        "context"
        "io"
+       "runtime"
        "sync"
        "sync/atomic"
        "time"
@@ -141,45 +142,50 @@ func (sr *schemaRepo) SendMetadataEvent(event 
MetadataEvent) {
 }
 
 func (sr *schemaRepo) Watcher() {
-       defer func() {
-               if err := recover(); err != nil {
-                       sr.l.Warn().Interface("err", err).Msg("watching the 
events")
-               }
-       }()
-       for {
-               select {
-               case evt, more := <-sr.eventCh:
-                       if !more {
-                               return
-                       }
-                       sr.l.Info().Interface("event", evt).Msg("received an 
event")
-                       for i := 0; i < 10; i++ {
-                               var err error
-                               switch evt.Typ {
-                               case EventAddOrUpdate:
-                                       switch evt.Kind {
-                                       case EventKindGroup:
-                                               _, err = 
sr.StoreGroup(evt.Metadata)
-                                       case EventKindResource:
-                                               _, err = 
sr.storeResource(evt.Metadata)
+       for i := 0; i < 10; i++ {
+               go func() {
+                       defer func() {
+                               if err := recover(); err != nil {
+                                       sr.l.Warn().Interface("err", 
err).Msg("watching the events")
+                               }
+                       }()
+                       for {
+                               select {
+                               case evt, more := <-sr.eventCh:
+                                       if !more {
+                                               return
                                        }
-                               case EventDelete:
-                                       switch evt.Kind {
-                                       case EventKindGroup:
-                                               err = 
sr.deleteGroup(evt.Metadata)
-                                       case EventKindResource:
-                                               err = 
sr.deleteResource(evt.Metadata)
+                                       sr.l.Info().Interface("event", 
evt).Msg("received an event")
+                                       for i := 0; i < 10; i++ {
+                                               var err error
+                                               switch evt.Typ {
+                                               case EventAddOrUpdate:
+                                                       switch evt.Kind {
+                                                       case EventKindGroup:
+                                                               _, err = 
sr.StoreGroup(evt.Metadata)
+                                                       case EventKindResource:
+                                                               _, err = 
sr.storeResource(evt.Metadata)
+                                                       }
+                                               case EventDelete:
+                                                       switch evt.Kind {
+                                                       case EventKindGroup:
+                                                               err = 
sr.deleteGroup(evt.Metadata)
+                                                       case EventKindResource:
+                                                               err = 
sr.deleteResource(evt.Metadata)
+                                                       }
+                                               }
+                                               if err == nil {
+                                                       break
+                                               }
+                                               runtime.Gosched()
+                                               time.Sleep(time.Second)
+                                               
sr.l.Err(err).Interface("event", evt).Int("round", i).Msg("fail to handle the 
metadata event. retry...")
                                        }
+                               case <-sr.workerStopCh:
+                                       return
                                }
-                               if err == nil {
-                                       break
-                               }
-                               time.Sleep(time.Second)
-                               sr.l.Err(err).Interface("event", 
evt).Int("round", i).Msg("fail to handle the metadata event. retry...")
                        }
-               case <-sr.workerStopCh:
-                       return
-               }
+               }()
        }
 }
 
@@ -344,9 +350,6 @@ func (sr *schemaRepo) Close() {
                        sr.l.Warn().Interface("err", err).Msg("closing 
resource")
                }
        }()
-       if sr.eventCh != nil {
-               close(sr.eventCh)
-       }
        if sr.workerStopCh != nil {
                close(sr.workerStopCh)
        }
diff --git a/bydbctl/cmd/bydbctl/main.go b/pkg/test/helpers/http_health.go
similarity index 58%
copy from bydbctl/cmd/bydbctl/main.go
copy to pkg/test/helpers/http_health.go
index ebb144e..ec6aa2f 100644
--- a/bydbctl/cmd/bydbctl/main.go
+++ b/pkg/test/helpers/http_health.go
@@ -6,7 +6,7 @@
 // not use this file except in compliance with the License.
 // You may obtain a copy of the License at
 //
-//     http://www.apache.org/licenses/LICENSE-2.0
+//     http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
@@ -14,20 +14,30 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-// Package main provides main entry for the command-line toolkit, i.e. bydbctl
-package main
+package helpers
 
 import (
        "fmt"
-       "os"
 
-       "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+       "github.com/go-resty/resty/v2"
 )
 
-func main() {
-       if err := cmd.NewRoot().Execute(); err != nil {
-               _, _ = fmt.Fprintln(os.Stderr, err)
-               os.Exit(1)
+func HTTPHealthCheck(addr string) func() error {
+       return func() error {
+               client := resty.New()
+
+               resp, err := client.R().
+                       SetHeader("Accept", "application/json").
+                       Get(fmt.Sprintf("http://%s/api/healthz";, addr))
+               if err != nil {
+                       return err
+               }
+
+               if resp.StatusCode() != 200 {
+                       l.Warn().Str("responded_status", 
resp.Status()).Msg("service unhealthy")
+                       return ErrServiceUnhealthy
+               }
+               l.Info().Stringer("response", resp).Msg("connected")
+               return nil
        }
 }

Reply via email to