This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4722514e Test distributed metadata (#329)
4722514e is described below

commit 4722514eeb64bad04f0e36cc260740c8968edf39
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 14 22:20:52 2023 +0800

    Test distributed metadata (#329)
---
 banyand/cmd/server/main.go                         |  5 +-
 banyand/dquery/dquery.go                           | 15 ++++
 banyand/dquery/measure.go                          |  2 +-
 banyand/dquery/stream.go                           |  2 +-
 banyand/measure/metadata.go                        |  7 +-
 banyand/metadata/client.go                         |  7 +-
 banyand/metadata/schema/etcd.go                    | 60 ++++++++++-----
 banyand/metadata/schema/etcd_test.go               |  2 +-
 banyand/metadata/schema/group.go                   | 33 +++-----
 banyand/metadata/schema/node.go                    |  3 +-
 banyand/metadata/schema/property.go                |  5 +-
 banyand/metadata/schema/register_test.go           |  9 ++-
 banyand/metadata/schema/schema_suite_test.go       | 37 ---------
 banyand/metadata/schema/watcher.go                 | 14 +++-
 banyand/metadata/schema/watcher_test.go            | 30 ++++++++
 banyand/observability/pprof.go                     | 10 ++-
 banyand/observability/service.go                   |  6 ++
 banyand/queue/local.go                             |  4 +
 banyand/queue/pub/client.go                        | 13 +++-
 banyand/queue/queue.go                             |  1 +
 banyand/queue/sub/server.go                        |  4 +-
 banyand/stream/metadata.go                         |  8 +-
 .../cmd/storage.go => pkg/cmdsetup/data.go         | 60 +++++----------
 {banyand/internal/cmd => pkg/cmdsetup}/liaison.go  | 18 ++---
 {banyand/internal/cmd => pkg/cmdsetup}/root.go     | 13 ++--
 .../internal/cmd => pkg/cmdsetup}/standalone.go    | 15 ++--
 pkg/logger/logger.go                               |  2 +-
 pkg/logger/setting.go                              |  8 +-
 pkg/test/helpers/etcd.go                           | 66 ++++++++++++++++
 pkg/test/setup/setup.go                            | 21 +++++
 pkg/test/stream/etcd.go                            |  3 +-
 scripts/build/lint.mk                              |  2 +-
 test/integration/distributed/setup/node_test.go    | 89 ++++++++++++++++++++++
 .../distributed/setup/setup_suite_test.go          | 60 ++++++++-------
 .../cold_query/query_suite_test.go                 |  3 +-
 .../{ => standalone}/other/measure_test.go         |  0
 .../{ => standalone}/other/other_suite_test.go     |  3 +-
 .../{ => standalone}/other/property_test.go        |  0
 .../other/testdata/server_cert.pem                 |  0
 .../{ => standalone}/other/testdata/server_key.pem |  0
 .../integration/{ => standalone}/other/tls_test.go |  0
 .../{ => standalone}/query/query_suite_test.go     |  3 +-
 .../integration/standalone/standalone.go           | 19 +----
 43 files changed, 449 insertions(+), 213 deletions(-)

diff --git a/banyand/cmd/server/main.go b/banyand/cmd/server/main.go
index 8906f84d..99827d13 100644
--- a/banyand/cmd/server/main.go
+++ b/banyand/cmd/server/main.go
@@ -22,11 +22,12 @@ import (
        "fmt"
        "os"
 
-       "github.com/apache/skywalking-banyandb/banyand/internal/cmd"
+       "github.com/apache/skywalking-banyandb/pkg/cmdsetup"
+       "github.com/apache/skywalking-banyandb/pkg/signal"
 )
 
 func main() {
-       if err := cmd.NewRoot().Execute(); err != nil {
+       if err := cmdsetup.NewRoot(new(signal.Handler)).Execute(); err != nil {
                _, _ = fmt.Fprintln(os.Stderr, err)
                os.Exit(1)
        }
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index 61f490c8..4d77b6fb 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -35,12 +35,15 @@ const (
        moduleName = "distributed-query"
 )
 
+var _ run.Service = (*queryService)(nil)
+
 type queryService struct {
        log         *logger.Logger
        metaService metadata.Repo
        sqp         *streamQueryProcessor
        mqp         *measureQueryProcessor
        tqp         *topNQueryProcessor
+       closer      *run.Closer
 }
 
 // NewService return a new query service.
@@ -48,6 +51,7 @@ func NewService(metaService metadata.Repo, broadcaster 
bus.Broadcaster,
 ) (run.Unit, error) {
        svc := &queryService{
                metaService: metaService,
+               closer:      run.NewCloser(1),
        }
        svc.sqp = &streamQueryProcessor{
                queryService: svc,
@@ -75,6 +79,17 @@ func (q *queryService) PreRun(_ context.Context) error {
        return nil
 }
 
+func (q *queryService) GracefulStop() {
+       q.sqp.streamService.Close()
+       q.mqp.measureService.Close()
+       q.closer.Done()
+       q.closer.CloseThenWait()
+}
+
+func (q *queryService) Serve() run.StopNotify {
+       return q.closer.CloseNotify()
+}
+
 var _ executor.DistributedExecutionContext = (*distributedContext)(nil)
 
 type distributedContext struct {
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index 522a9cda..b145de94 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -31,7 +31,7 @@ import (
 )
 
 type measureQueryProcessor struct {
-       measureService measure.Query
+       measureService measure.SchemaService
        broadcaster    bus.Broadcaster
        *queryService
 }
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 3a92aaeb..62804d00 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -31,7 +31,7 @@ import (
 )
 
 type streamQueryProcessor struct {
-       streamService stream.Query
+       streamService stream.SchemaService
        broadcaster   bus.Broadcaster
        *queryService
 }
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 1185442b..26a6b648 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -42,6 +42,11 @@ import (
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
+// SchemaService allows querying schema information.
+type SchemaService interface {
+       Query
+       Close()
+}
 type schemaRepo struct {
        resourceSchema.Repository
        l        *logger.Logger
@@ -65,7 +70,7 @@ func newSchemaRepo(path string, metadata metadata.Repo,
 }
 
 // NewPortableRepository creates a new portable repository.
-func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) Query {
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) 
SchemaService {
        r := &schemaRepo{
                l:        l,
                metadata: metadata,
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 33c258be..d1271f20 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -41,6 +41,7 @@ func NewClient(_ context.Context) (Service, error) {
 }
 
 type clientService struct {
+       namespace      string
        schemaRegistry schema.Registry
        alc            *allocator
        closer         *run.Closer
@@ -53,6 +54,7 @@ func (s *clientService) SchemaRegistry() schema.Registry {
 
 func (s *clientService) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("metadata")
+       fs.StringVar(&s.namespace, "namespace", "banyandb", "The namespace of 
the metadata stored in etcd")
        fs.StringArrayVar(&s.endpoints, flagEtcdEndpointsName, 
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
        return fs
 }
@@ -66,7 +68,10 @@ func (s *clientService) Validate() error {
 
 func (s *clientService) PreRun(ctx context.Context) error {
        var err error
-       s.schemaRegistry, err = 
schema.NewEtcdSchemaRegistry(schema.ConfigureServerEndpoints(s.endpoints))
+       s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
+               schema.Namespace(s.namespace),
+               schema.ConfigureServerEndpoints(s.endpoints),
+       )
        if err != nil {
                return err
        }
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index aff8e9ad..f53a718c 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -20,13 +20,13 @@ package schema
 import (
        "context"
        "fmt"
+       "path"
        "sync"
        "time"
 
        "github.com/pkg/errors"
        clientv3 "go.etcd.io/etcd/client/v3"
        "go.uber.org/zap"
-       "google.golang.org/grpc"
        "google.golang.org/protobuf/proto"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -55,6 +55,13 @@ type HasMetadata interface {
 // RegistryOption is the option to create Registry.
 type RegistryOption func(*etcdSchemaRegistryConfig)
 
+// Namespace sets the namespace of the registry.
+func Namespace(namespace string) RegistryOption {
+       return func(config *etcdSchemaRegistryConfig) {
+               config.namespace = namespace
+       }
+}
+
 // ConfigureServerEndpoints sets a list of the server urls.
 func ConfigureServerEndpoints(url []string) RegistryOption {
        return func(config *etcdSchemaRegistryConfig) {
@@ -63,14 +70,16 @@ func ConfigureServerEndpoints(url []string) RegistryOption {
 }
 
 type etcdSchemaRegistry struct {
-       client   *clientv3.Client
-       closer   *run.Closer
-       l        *logger.Logger
-       watchers []*watcher
-       mux      sync.RWMutex
+       namespace string
+       client    *clientv3.Client
+       closer    *run.Closer
+       l         *logger.Logger
+       watchers  []*watcher
+       mux       sync.RWMutex
 }
 
 type etcdSchemaRegistryConfig struct {
+       namespace       string
        serverEndpoints []string
 }
 
@@ -124,7 +133,6 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) 
(Registry, error) {
                DialTimeout:          5 * time.Second,
                DialKeepAliveTime:    30 * time.Second,
                DialKeepAliveTimeout: 10 * time.Second,
-               DialOptions:          []grpc.DialOption{grpc.WithBlock()},
                Logger:               l,
        }
        client, err := clientv3.New(config)
@@ -132,18 +140,27 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) 
(Registry, error) {
                return nil, err
        }
        reg := &etcdSchemaRegistry{
-               client: client,
-               closer: run.NewCloser(1),
-               l:      log,
+               namespace: registryConfig.namespace,
+               client:    client,
+               closer:    run.NewCloser(1),
+               l:         log,
        }
        return reg, nil
 }
 
+func (e *etcdSchemaRegistry) prependNamespace(key string) string {
+       if e.namespace == "" {
+               return key
+       }
+       return path.Join("/", e.namespace, key)
+}
+
 func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message 
proto.Message) error {
        if !e.closer.AddRunning() {
                return ErrClosed
        }
        defer e.closer.Done()
+       key = e.prependNamespace(key)
        resp, err := e.client.Get(ctx, key)
        if err != nil {
                return err
@@ -177,6 +194,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, 
metadata Metadata) erro
        if err != nil {
                return err
        }
+       key = e.prependNamespace(key)
        getResp, err := e.client.Get(ctx, key)
        if err != nil {
                return err
@@ -228,6 +246,7 @@ func (e *etcdSchemaRegistry) create(ctx context.Context, 
metadata Metadata) erro
        if err != nil {
                return err
        }
+       key = e.prependNamespace(key)
        getResp, err := e.client.Get(ctx, key)
        if err != nil {
                return err
@@ -255,6 +274,7 @@ func (e *etcdSchemaRegistry) listWithPrefix(ctx 
context.Context, prefix string,
                return nil, ErrClosed
        }
        defer e.closer.Done()
+       prefix = e.prependNamespace(prefix)
        resp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix())
        if err != nil {
                return nil, err
@@ -279,6 +299,7 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context, 
metadata Metadata) (boo
        if err != nil {
                return false, err
        }
+       key = e.prependNamespace(key)
        resp, err := e.client.Delete(ctx, key, clientv3.WithPrevKV())
        if err != nil {
                return false, err
@@ -298,6 +319,7 @@ func (e *etcdSchemaRegistry) register(ctx context.Context, 
metadata Metadata) er
        if err != nil {
                return err
        }
+       key = e.prependNamespace(key)
        val, err := proto.Marshal(metadata.Spec.(proto.Message))
        if err != nil {
                return err
@@ -349,19 +371,19 @@ func (e *etcdSchemaRegistry) register(ctx 
context.Context, metadata Metadata) er
 }
 
 func (e *etcdSchemaRegistry) newWatcher(name string, kind Kind, handler 
EventHandler) *watcher {
-       return newWatcher(e.client, kind, handler, 
e.l.Named(fmt.Sprintf("watcher-%s[%s]", name, kind.String())))
+       return newWatcher(e.client, watcherConfig{
+               key:     e.prependNamespace(kind.key()),
+               kind:    kind,
+               handler: handler,
+       }, e.l.Named(fmt.Sprintf("watcher-%s[%s]", name, kind.String())))
 }
 
 func listPrefixesForEntity(group, entityPrefix string) string {
-       return entityPrefix + "/" + group
+       return path.Join(entityPrefix, group)
 }
 
 func formatKey(entityPrefix string, metadata *commonv1.Metadata) string {
-       return listPrefixesForEntity(metadata.GetGroup(), entityPrefix) + "/" + 
metadata.GetName()
-}
-
-func incrementLastByte(key string) string {
-       bb := []byte(key)
-       bb[len(bb)-1]++
-       return string(bb)
+       return path.Join(
+               listPrefixesForEntity(metadata.GetGroup(), entityPrefix),
+               metadata.GetName())
 }
diff --git a/banyand/metadata/schema/etcd_test.go 
b/banyand/metadata/schema/etcd_test.go
index 2ed06549..fc98664f 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -96,7 +96,7 @@ func preloadSchema(e Registry) error {
                return err
        }
        for _, entry := range entries {
-               data, err := indexRuleStore.ReadFile(indexRuleDir + "/" + 
entry.Name())
+               data, err := indexRuleStore.ReadFile(path.Join(indexRuleDir, 
entry.Name()))
                if err != nil {
                        return err
                }
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index f8d1545c..5afa54d2 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -19,20 +19,16 @@ package schema
 
 import (
        "context"
-       "strings"
+       "path"
 
        "github.com/pkg/errors"
        clientv3 "go.etcd.io/etcd/client/v3"
-       "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/timestamppb"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 )
 
-var (
-       groupsKeyPrefix  = "/groups/"
-       groupMetadataKey = "/__meta_group__"
-)
+var groupsKeyPrefix = "/groups/"
 
 func (e *etcdSchemaRegistry) GetGroup(ctx context.Context, group string) 
(*commonv1.Group, error) {
        var entity commonv1.Group
@@ -44,24 +40,15 @@ func (e *etcdSchemaRegistry) GetGroup(ctx context.Context, 
group string) (*commo
 }
 
 func (e *etcdSchemaRegistry) ListGroup(ctx context.Context) 
([]*commonv1.Group, error) {
-       messages, err := e.client.Get(ctx, groupsKeyPrefix, 
clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(groupsKeyPrefix)))
+       messages, err := e.listWithPrefix(ctx, groupsKeyPrefix, KindGroup)
        if err != nil {
                return nil, err
        }
-
-       var groups []*commonv1.Group
-       for _, kv := range messages.Kvs {
-               // kv.Key = "/groups/" + {group} + "/__meta_info__"
-               if strings.HasSuffix(string(kv.Key), groupMetadataKey) {
-                       message := &commonv1.Group{}
-                       if innerErr := proto.Unmarshal(kv.Value, message); 
innerErr != nil {
-                               return nil, innerErr
-                       }
-                       groups = append(groups, message)
-               }
+       entities := make([]*commonv1.Group, 0, len(messages))
+       for _, message := range messages {
+               entities = append(entities, message.(*commonv1.Group))
        }
-
-       return groups, nil
+       return entities, nil
 }
 
 func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context, group string) 
(bool, error) {
@@ -72,9 +59,9 @@ func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context, 
group string) (boo
        keysToDelete := allKeys()
        deleteOPs := make([]clientv3.Op, 0, len(keysToDelete)+1)
        for _, key := range keysToDelete {
-               deleteOPs = append(deleteOPs, 
clientv3.OpDelete(listPrefixesForEntity(group, key), clientv3.WithPrefix()))
+               deleteOPs = append(deleteOPs, 
clientv3.OpDelete(e.prependNamespace(listPrefixesForEntity(group, key)), 
clientv3.WithPrefix()))
        }
-       deleteOPs = append(deleteOPs, clientv3.OpDelete(formatGroupKey(group), 
clientv3.WithPrefix()))
+       deleteOPs = append(deleteOPs, 
clientv3.OpDelete(e.prependNamespace(formatGroupKey(group)), 
clientv3.WithPrefix()))
        txnResponse, err := e.client.Txn(ctx).Then(deleteOPs...).Commit()
        if err != nil {
                return false, err
@@ -109,5 +96,5 @@ func (e *etcdSchemaRegistry) UpdateGroup(ctx 
context.Context, group *commonv1.Gr
 }
 
 func formatGroupKey(group string) string {
-       return groupsKeyPrefix + group + groupMetadataKey
+       return path.Join(groupsKeyPrefix, group)
 }
diff --git a/banyand/metadata/schema/node.go b/banyand/metadata/schema/node.go
index c383c134..575b8c89 100644
--- a/banyand/metadata/schema/node.go
+++ b/banyand/metadata/schema/node.go
@@ -19,6 +19,7 @@ package schema
 
 import (
        "context"
+       "path"
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 )
@@ -57,5 +58,5 @@ func (e *etcdSchemaRegistry) RegisterNode(ctx 
context.Context, node *databasev1.
 }
 
 func formatNodeKey(name string) string {
-       return nodeKeyPrefix + name
+       return path.Join(nodeKeyPrefix, name)
 }
diff --git a/banyand/metadata/schema/property.go 
b/banyand/metadata/schema/property.go
index 7d9b1c5d..bb76f0a8 100644
--- a/banyand/metadata/schema/property.go
+++ b/banyand/metadata/schema/property.go
@@ -19,6 +19,7 @@ package schema
 
 import (
        "context"
+       "path"
 
        "github.com/pkg/errors"
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -62,7 +63,7 @@ func (e *etcdSchemaRegistry) ListProperty(ctx 
context.Context, container *common
        if container.Group == "" {
                return nil, BadRequest("container.group", "group should not be 
empty")
        }
-       messages, err := e.listWithPrefix(ctx, 
listPrefixesForEntity(container.Group+"/"+container.Name, propertyKeyPrefix), 
KindProperty)
+       messages, err := e.listWithPrefix(ctx, 
listPrefixesForEntity(path.Join(container.Group, container.Name), 
propertyKeyPrefix), KindProperty)
        if err != nil {
                return nil, err
        }
@@ -165,7 +166,7 @@ func (e *etcdSchemaRegistry) DeleteProperty(ctx 
context.Context, metadata *prope
 func transformKey(metadata *propertyv1.Metadata) *commonv1.Metadata {
        return &commonv1.Metadata{
                Group: metadata.Container.GetGroup(),
-               Name:  metadata.Container.Name + "/" + metadata.Id,
+               Name:  path.Join(metadata.Container.Name, metadata.Id),
        }
 }
 
diff --git a/banyand/metadata/schema/register_test.go 
b/banyand/metadata/schema/register_test.go
index 28e7162a..9f05a999 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -58,7 +58,10 @@ var _ = ginkgo.Describe("etcd_register", func() {
                        embeddedetcd.RootDir(randomTempDir()))
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                <-server.ReadyNotify()
-               schemaRegistry, err := 
NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints))
+               schemaRegistry, err := NewEtcdSchemaRegistry(
+                       Namespace("test"),
+                       ConfigureServerEndpoints(endpoints),
+               )
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                r = schemaRegistry.(*etcdSchemaRegistry)
        })
@@ -74,7 +77,9 @@ var _ = ginkgo.Describe("etcd_register", func() {
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                gomega.Expect(r.get(context.Background(), k, 
&databasev1.Node{})).ShouldNot(gomega.HaveOccurred())
                gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred())
-               schemaRegistry, err := 
NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints))
+               schemaRegistry, err := NewEtcdSchemaRegistry(
+                       Namespace("test"),
+                       ConfigureServerEndpoints(endpoints))
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                r = schemaRegistry.(*etcdSchemaRegistry)
                gomega.Expect(r.get(context.Background(), k, 
&databasev1.Node{})).Should(gomega.MatchError(ErrGRPCResourceNotFound))
diff --git a/banyand/metadata/schema/schema_suite_test.go 
b/banyand/metadata/schema/schema_suite_test.go
index 383a4f54..2173b3ee 100644
--- a/banyand/metadata/schema/schema_suite_test.go
+++ b/banyand/metadata/schema/schema_suite_test.go
@@ -18,50 +18,13 @@
 package schema
 
 import (
-       "fmt"
        "testing"
 
        "github.com/onsi/ginkgo/v2"
        "github.com/onsi/gomega"
-
-       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/test"
-       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestSchema(t *testing.T) {
        gomega.RegisterFailHandler(ginkgo.Fail)
        ginkgo.RunSpecs(t, "Schema Suite")
 }
-
-var (
-       server   embeddedetcd.Server
-       registry *etcdSchemaRegistry
-)
-
-var _ = ginkgo.BeforeSuite(func() {
-       gomega.Expect(logger.Init(logger.Logging{
-               Env:   "dev",
-               Level: flags.LogLevel,
-       })).To(gomega.Succeed())
-       ports, err := test.AllocateFreePorts(2)
-       if err != nil {
-               panic("fail to find free ports")
-       }
-       endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d";, ports[0])}
-       server, err = embeddedetcd.NewServer(
-               embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-               embeddedetcd.RootDir(randomTempDir()))
-       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-       <-server.ReadyNotify()
-       schemaRegistry, err := 
NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints))
-       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-       registry = schemaRegistry.(*etcdSchemaRegistry)
-})
-
-var _ = ginkgo.AfterSuite(func() {
-       registry.Close()
-       server.Close()
-       <-server.StopNotify()
-})
diff --git a/banyand/metadata/schema/watcher.go 
b/banyand/metadata/schema/watcher.go
index 3a93828c..bd9b2b8f 100644
--- a/banyand/metadata/schema/watcher.go
+++ b/banyand/metadata/schema/watcher.go
@@ -27,6 +27,12 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+type watcherConfig struct {
+       handler EventHandler
+       key     string
+       kind    Kind
+}
+
 type watcher struct {
        handler EventHandler
        cli     *clientv3.Client
@@ -36,12 +42,12 @@ type watcher struct {
        kind    Kind
 }
 
-func newWatcher(cli *clientv3.Client, kind Kind, handler EventHandler, l 
*logger.Logger) *watcher {
+func newWatcher(cli *clientv3.Client, wc watcherConfig, l *logger.Logger) 
*watcher {
        w := &watcher{
                cli:     cli,
-               key:     kind.key(),
-               kind:    kind,
-               handler: handler,
+               key:     wc.key,
+               kind:    wc.kind,
+               handler: wc.handler,
                closer:  run.NewCloser(1),
                l:       l,
        }
diff --git a/banyand/metadata/schema/watcher_test.go 
b/banyand/metadata/schema/watcher_test.go
index 04d22194..a796c4bb 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -28,6 +28,9 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
@@ -74,10 +77,37 @@ var _ = ginkgo.Describe("Watcher", func() {
        var (
                mockedObj *mockedHandler
                watcher   *watcher
+               server    embeddedetcd.Server
+               registry  *etcdSchemaRegistry
        )
 
        ginkgo.BeforeEach(func() {
                mockedObj = newMockedHandler()
+               gomega.Expect(logger.Init(logger.Logging{
+                       Env:   "dev",
+                       Level: flags.LogLevel,
+               })).To(gomega.Succeed())
+               ports, err := test.AllocateFreePorts(2)
+               if err != nil {
+                       panic("fail to find free ports")
+               }
+               endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d";, 
ports[0])}
+               server, err = embeddedetcd.NewServer(
+                       embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+                       embeddedetcd.RootDir(randomTempDir()))
+               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+               <-server.ReadyNotify()
+               schemaRegistry, err := NewEtcdSchemaRegistry(
+                       Namespace("test"),
+                       ConfigureServerEndpoints(endpoints),
+               )
+               gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+               registry = schemaRegistry.(*etcdSchemaRegistry)
+       })
+       ginkgo.AfterEach(func() {
+               registry.Close()
+               server.Close()
+               <-server.StopNotify()
        })
 
        ginkgo.It("should handle all existing key-value pairs on initial load", 
func() {
diff --git a/banyand/observability/pprof.go b/banyand/observability/pprof.go
index 46cbb0da..b5af797c 100644
--- a/banyand/observability/pprof.go
+++ b/banyand/observability/pprof.go
@@ -20,6 +20,7 @@ package observability
 import (
        "net/http"
        "net/http/pprof"
+       "sync"
        "time"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -43,6 +44,7 @@ type pprofService struct {
        svr        *http.Server
        closer     *run.Closer
        listenAddr string
+       svrMux     sync.Mutex
 }
 
 func (p *pprofService) FlagSet() *run.FlagSet {
@@ -70,6 +72,8 @@ func (p *pprofService) Serve() run.StopNotify {
        mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
        mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
        mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
+       p.svrMux.Lock()
+       defer p.svrMux.Unlock()
        p.svr = &http.Server{
                Addr:              p.listenAddr,
                ReadHeaderTimeout: 3 * time.Second,
@@ -84,6 +88,10 @@ func (p *pprofService) Serve() run.StopNotify {
 }
 
 func (p *pprofService) GracefulStop() {
-       _ = p.svr.Close()
+       p.svrMux.Lock()
+       defer p.svrMux.Unlock()
+       if p.svr != nil {
+               _ = p.svr.Close()
+       }
        p.closer.CloseThenWait()
 }
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index 412dc8e1..00272799 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -20,6 +20,7 @@ package observability
 import (
        "context"
        "net/http"
+       "sync"
        "time"
 
        "github.com/robfig/cron/v3"
@@ -48,6 +49,7 @@ type metricService struct {
        closer     *run.Closer
        scheduler  *timestamp.Scheduler
        listenAddr string
+       mutex      sync.Mutex
 }
 
 func (p *metricService) FlagSet() *run.FlagSet {
@@ -70,6 +72,8 @@ func (p *metricService) Name() string {
 func (p *metricService) Serve() run.StopNotify {
        p.l = logger.GetLogger(p.Name())
 
+       p.mutex.Lock()
+       defer p.mutex.Unlock()
        clock, _ := timestamp.GetClock(context.TODO())
        p.scheduler = timestamp.NewScheduler(p.l, clock)
        err := p.scheduler.Register("metrics-collector", cron.Descriptor, 
"@every 15s", func(now time.Time, logger *logger.Logger) bool {
@@ -93,6 +97,8 @@ func (p *metricService) Serve() run.StopNotify {
 }
 
 func (p *metricService) GracefulStop() {
+       p.mutex.Lock()
+       defer p.mutex.Unlock()
        if p.scheduler != nil {
                p.scheduler.Close()
        }
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 09ca07aa..c5b8dc42 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -80,6 +80,10 @@ func (l local) NewBatchPublisher() BatchPublisher {
        }
 }
 
+func (*local) GetPort() *uint32 {
+       return nil
+}
+
 type localBatchPublisher struct {
        local *bus.Bus
 }
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 8fb06418..c5c68139 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -53,6 +53,17 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
                p.log.Warn().Msg("failed to cast node spec")
                return
        }
+       var hasDataRole bool
+       for _, r := range node.Roles {
+               if r == databasev1.Role_ROLE_DATA {
+                       hasDataRole = true
+                       break
+               }
+       }
+       if !hasDataRole {
+               return
+       }
+
        address := node.GrpcAddress
        if address == "" {
                p.log.Warn().Stringer("node", node).Msg("grpc address is empty")
@@ -70,7 +81,7 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
        if _, ok := p.clients[name]; ok {
                return
        }
-       conn, err := grpc.Dial(address, 
grpc.WithTransportCredentials(insecure.NewCredentials()), 
grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithBlock())
+       conn, err := grpc.Dial(address, 
grpc.WithTransportCredentials(insecure.NewCredentials()), 
grpc.WithDefaultServiceConfig(retryPolicy))
        if err != nil {
                p.log.Error().Err(err).Msg("failed to connect to grpc server")
                return
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index bada6711..7bb8c531 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -45,6 +45,7 @@ type Client interface {
 type Server interface {
        run.Unit
        bus.Subscriber
+       GetPort() *uint32
 }
 
 // BatchPublisher is the interface for publishing data in batch.
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 78833713..2b880f9f 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -73,7 +73,9 @@ type server struct {
 
 // NewServer returns a new gRPC server.
 func NewServer() queue.Server {
-       return &server{}
+       return &server{
+               listeners: make(map[bus.Topic]bus.MessageListener),
+       }
 }
 
 func (s *server) PreRun(_ context.Context) error {
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 7a9b34df..0ce3e6ce 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -38,6 +38,12 @@ import (
 
 var _ Query = (*schemaRepo)(nil)
 
+// SchemaService allows querying stream schema.
+type SchemaService interface {
+       Query
+       Close()
+}
+
 type schemaRepo struct {
        resourceSchema.Repository
        l        *logger.Logger
@@ -61,7 +67,7 @@ func newSchemaRepo(path string, metadata metadata.Repo,
 }
 
 // NewPortableRepository creates a new portable repository.
-func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) Query {
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) 
SchemaService {
        r := &schemaRepo{
                l:        l,
                metadata: metadata,
diff --git a/banyand/internal/cmd/storage.go b/pkg/cmdsetup/data.go
similarity index 67%
rename from banyand/internal/cmd/storage.go
rename to pkg/cmdsetup/data.go
index ceadaa17..7a421b12 100644
--- a/banyand/internal/cmd/storage.go
+++ b/pkg/cmdsetup/data.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package cmd
+package cmdsetup
 
 import (
        "context"
@@ -33,21 +33,10 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/config"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
-       "github.com/apache/skywalking-banyandb/pkg/signal"
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-var storageGroup = run.NewGroup("storage")
-
-const (
-       storageModeData  = "data"
-       storageModeQuery = "query"
-       storageModeMix   = "mix"
-)
-
-var flagStorageMode string
-
-func newStorageCmd() *cobra.Command {
+func newDataCmd(runners ...run.Unit) *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
        metaSvc, err := metadata.NewClient(ctx)
@@ -71,59 +60,46 @@ func newStorageCmd() *cobra.Command {
        profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService()
 
-       units := []run.Unit{
-               new(signal.Handler),
+       var units []run.Unit
+       units = append(units, runners...)
+       units = append(units,
+               metaSvc,
                pipeline,
                measureSvc,
                streamSvc,
                q,
                profSvc,
-       }
+       )
        if metricSvc != nil {
                units = append(units, metricSvc)
        }
-       // Meta the run Group units.
-       storageGroup.Register(units...)
+       dataGroup := run.NewGroup("data")
+       dataGroup.Register(units...)
        logging := logger.Logging{}
-       storageCmd := &cobra.Command{
-               Use:     "storage",
+       dataCmd := &cobra.Command{
+               Use:     "data",
                Version: version.Build(),
-               Short:   "Run as the storage server",
+               Short:   "Run as the data server",
                PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
                        if err = config.Load("logging", cmd.Flags()); err != 
nil {
                                return err
                        }
                        return logger.Init(logging)
                },
-               PreRun: func(cmd *cobra.Command, args []string) {
-                       if flagStorageMode == storageModeMix {
-                               return
-                       }
-                       switch flagStorageMode {
-                       case storageModeData:
-                               storageGroup.Deregister(q)
-                       case storageModeQuery:
-                               storageGroup.Deregister(streamSvc)
-                               storageGroup.Deregister(measureSvc)
-                       default:
-                               l.Fatal().Str("mode", 
flagStorageMode).Msg("unknown storage mode")
-                       }
-               },
                RunE: func(cmd *cobra.Command, args []string) (err error) {
-                       node, err := common.GenerateNode(nil, nil)
+                       node, err := common.GenerateNode(pipeline.GetPort(), 
nil)
                        if err != nil {
                                return err
                        }
-                       logger.GetLogger().Info().Msg("starting as a storage 
server")
+                       logger.GetLogger().Info().Msg("starting as a data 
server")
                        // Spawn our go routines and wait for shutdown.
-                       if err := 
storageGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, 
node)); err != nil {
-                               
logger.GetLogger().Error().Err(err).Stack().Str("name", 
storageGroup.Name()).Msg("Exit")
+                       if err := 
dataGroup.Run(context.WithValue(context.Background(), common.ContextNodeKey, 
node)); err != nil {
+                               
logger.GetLogger().Error().Err(err).Stack().Str("name", 
dataGroup.Name()).Msg("Exit")
                                os.Exit(-1)
                        }
                        return nil
                },
        }
-       storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m", 
storageModeMix, "the storage mode, one of [data, query, mix]")
-       storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet)
-       return storageCmd
+       dataCmd.Flags().AddFlagSet(dataGroup.RegisterFlags().FlagSet)
+       return dataCmd
 }
diff --git a/banyand/internal/cmd/liaison.go b/pkg/cmdsetup/liaison.go
similarity index 92%
rename from banyand/internal/cmd/liaison.go
rename to pkg/cmdsetup/liaison.go
index d09428f0..3ef2eb44 100644
--- a/banyand/internal/cmd/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package cmd
+package cmdsetup
 
 import (
        "context"
@@ -32,13 +32,10 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/queue/pub"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
-       "github.com/apache/skywalking-banyandb/pkg/signal"
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-var liaisonGroup = run.NewGroup("liaison")
-
-func newLiaisonCmd() *cobra.Command {
+func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
        metaSvc, err := metadata.NewClient(ctx)
@@ -54,19 +51,20 @@ func newLiaisonCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate distributed query 
service")
        }
-
-       units := []run.Unit{
-               new(signal.Handler),
+       var units []run.Unit
+       units = append(units, runners...)
+       units = append(units,
+               metaSvc,
                pipeline,
                dQuery,
                grpcServer,
                httpServer,
                profSvc,
-       }
+       )
        if metricSvc != nil {
                units = append(units, metricSvc)
        }
-       // Meta the run Group units.
+       liaisonGroup := run.NewGroup("liaison")
        liaisonGroup.Register(units...)
        liaisonCmd := &cobra.Command{
                Use:     "liaison",
diff --git a/banyand/internal/cmd/root.go b/pkg/cmdsetup/root.go
similarity index 92%
rename from banyand/internal/cmd/root.go
rename to pkg/cmdsetup/root.go
index b1535a2b..6b7df86a 100644
--- a/banyand/internal/cmd/root.go
+++ b/pkg/cmdsetup/root.go
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package cmd is an internal package defining cli commands for BanyanDB.
-package cmd
+// Package cmdsetup implements a real env in which to run tests.
+package cmdsetup
 
 import (
        "fmt"
@@ -26,6 +26,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/config"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
@@ -39,7 +40,7 @@ const logo = `
 `
 
 // NewRoot returns a root command.
-func NewRoot() *cobra.Command {
+func NewRoot(runners ...run.Unit) *cobra.Command {
        logging := logger.Logging{}
        cmd := &cobra.Command{
                DisableAutoGenTag: true,
@@ -63,9 +64,9 @@ BanyanDB, as an observability database, aims to ingest, 
analyze and store Metric
        cmd.PersistentFlags().StringVar(&logging.Level, "logging-level", 
"info", "the root level of logging")
        cmd.PersistentFlags().StringArrayVar(&logging.Modules, 
"logging-modules", nil, "the specific module")
        cmd.PersistentFlags().StringArrayVar(&logging.Levels, "logging-levels", 
nil, "the level logging of logging")
-       cmd.AddCommand(newStandaloneCmd())
-       cmd.AddCommand(newStorageCmd())
-       cmd.AddCommand(newLiaisonCmd())
+       cmd.AddCommand(newStandaloneCmd(runners...))
+       cmd.AddCommand(newDataCmd(runners...))
+       cmd.AddCommand(newLiaisonCmd(runners...))
        return cmd
 }
 
diff --git a/banyand/internal/cmd/standalone.go b/pkg/cmdsetup/standalone.go
similarity index 94%
rename from banyand/internal/cmd/standalone.go
rename to pkg/cmdsetup/standalone.go
index 8686db07..f0dca2f2 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package cmd
+package cmdsetup
 
 import (
        "context"
@@ -34,13 +34,10 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
-       "github.com/apache/skywalking-banyandb/pkg/signal"
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-var standaloneGroup = run.NewGroup("standalone")
-
-func newStandaloneCmd() *cobra.Command {
+func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
        pipeline := queue.Local()
@@ -65,8 +62,9 @@ func newStandaloneCmd() *cobra.Command {
        metricSvc := observability.NewMetricService()
        httpServer := http.NewServer()
 
-       units := []run.Unit{
-               new(signal.Handler),
+       var units []run.Unit
+       units = append(units, runners...)
+       units = append(units,
                pipeline,
                metaSvc,
                measureSvc,
@@ -75,10 +73,11 @@ func newStandaloneCmd() *cobra.Command {
                grpcServer,
                httpServer,
                profSvc,
-       }
+       )
        if metricSvc != nil {
                units = append(units, metricSvc)
        }
+       standaloneGroup := run.NewGroup("standalone")
        // Meta the run Group units.
        standaloneGroup.Register(units...)
 
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index ee8f8561..9743503f 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -77,7 +77,7 @@ func (l *Logger) Named(name ...string) *Logger {
                        level = ml
                }
        }
-       subLogger := root.l.With().Str("module", 
moduleBuilder.String()).Logger().Level(level)
+       subLogger := root.get().With().Str("module", 
moduleBuilder.String()).Logger().Level(level)
        return &Logger{module: module, modules: l.modules, development: 
l.development, Logger: &subLogger}
 }
 
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index 46a75ee8..81b401d6 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -74,13 +74,19 @@ func (rl *rootLogger) set(cfg Logging) error {
        return nil
 }
 
+func (rl *rootLogger) get() *Logger {
+       rl.m.Lock()
+       defer rl.m.Unlock()
+       return rl.l
+}
+
 // GetLogger return logger with a scope.
 func GetLogger(scope ...string) *Logger {
        root.verify()
        if len(scope) < 1 {
                return root.l
        }
-       l := root.l
+       l := root.get()
        for _, v := range scope {
                l = l.Named(v)
        }
diff --git a/pkg/test/helpers/etcd.go b/pkg/test/helpers/etcd.go
new file mode 100644
index 00000000..1c0320bc
--- /dev/null
+++ b/pkg/test/helpers/etcd.go
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package helpers
+
+import (
+       "context"
+       "log"
+       "time"
+
+       clientv3 "go.etcd.io/etcd/client/v3"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+var defaultDialTimeout = 5 * time.Second
+
+// ListKeys lists all keys under the given prefix.
+func ListKeys(serverAddress string, prefix string) 
(map[string]*databasev1.Node, error) {
+       client, err := clientv3.New(clientv3.Config{
+               Endpoints:   []string{serverAddress},
+               DialTimeout: defaultDialTimeout,
+       })
+       if err != nil {
+               log.Fatalf("Failed to create etcd client: %v", err)
+       }
+       defer client.Close()
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
defaultDialTimeout)
+       defer cancel()
+
+       resp, err := client.Get(ctx, prefix, clientv3.WithPrefix())
+       if err != nil {
+               return nil, err
+       }
+
+       if resp.Count == 0 {
+               return nil, nil
+       }
+
+       nodeMap := make(map[string]*databasev1.Node)
+       for _, kv := range resp.Kvs {
+               md, err := schema.KindNode.Unmarshal(kv)
+               if err != nil {
+                       return nil, err
+               }
+               nodeMap[string(kv.Key)] = md.Spec.(*databasev1.Node)
+       }
+
+       return nodeMap, nil
+}
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index e7fb94c7..622f897b 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -21,6 +21,7 @@ package setup
 import (
        "context"
        "fmt"
+       "sync"
 
        "github.com/onsi/gomega"
 
@@ -31,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/query"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/pkg/cmdsetup"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/test"
        test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
@@ -141,3 +143,22 @@ func (p *preloadService) PreRun(ctx context.Context) error 
{
 func (p *preloadService) SetMeta(meta metadata.Service) {
        p.metaSvc = meta
 }
+
+// CMD runs the command with given flags.
+func CMD(flags ...string) func() {
+       closer, closeFn := run.NewTester("closer")
+       rootCmd := cmdsetup.NewRoot(closer)
+       rootCmd.SetArgs(flags)
+       wg := sync.WaitGroup{}
+       wg.Add(1)
+       go func() {
+               defer func() {
+                       wg.Done()
+               }()
+               
gomega.Expect(rootCmd.Execute()).ShouldNot(gomega.HaveOccurred())
+       }()
+       return func() {
+               closeFn()
+               wg.Wait()
+       }
+}
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index 85c25d6f..e7557c31 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -21,6 +21,7 @@ package stream
 import (
        "context"
        "embed"
+       "path"
 
        "google.golang.org/protobuf/encoding/protojson"
 
@@ -73,7 +74,7 @@ func PreloadSchema(ctx context.Context, e schema.Registry) 
error {
                return err
        }
        for _, entry := range entries {
-               data, err := indexRuleStore.ReadFile(indexRuleDir + "/" + 
entry.Name())
+               data, err := indexRuleStore.ReadFile(path.Join(indexRuleDir, 
entry.Name()))
                if err != nil {
                        return err
                }
diff --git a/scripts/build/lint.mk b/scripts/build/lint.mk
index 0899dfc4..394161c0 100644
--- a/scripts/build/lint.mk
+++ b/scripts/build/lint.mk
@@ -29,4 +29,4 @@ lint: $(LINTER) $(REVIVE) ## Run all linters
 
 .PHONY: format
 format: $(LINTER)
-       $(LINTER) run --fix -c $(root_dir)/.golangci-format.yml
+       $(LINTER) run --fix -c $(root_dir)/.golangci-format.yml ./...
diff --git a/test/integration/distributed/setup/node_test.go 
b/test/integration/distributed/setup/node_test.go
new file mode 100644
index 00000000..2d48b784
--- /dev/null
+++ b/test/integration/distributed/setup/node_test.go
@@ -0,0 +1,89 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_setup_test
+
+import (
+       "fmt"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+const host = "127.0.0.1"
+
+var _ = Describe("Node registration", func() {
+       It("should register/unregister a liaison node successfully", func() {
+               namespace := "liaison-test"
+               nodeHost := "liaison-1"
+               ports, err := test.AllocateFreePorts(2)
+               Expect(err).NotTo(HaveOccurred())
+               addr := fmt.Sprintf("%s:%d", host, ports[0])
+               httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
+               closeFn := setup.CMD("liaison",
+                       "--namespace", namespace,
+                       "--grpc-host="+host,
+                       fmt.Sprintf("--grpc-port=%d", ports[0]),
+                       "--http-host="+host,
+                       fmt.Sprintf("--http-port=%d", ports[1]),
+                       "--http-grpc-addr="+addr,
+                       "--etcd-endpoints", etcdEndpoint,
+                       "--node-host-provider", "flag",
+                       "--node-host", nodeHost)
+               Eventually(helpers.HTTPHealthCheck(httpAddr), 
flags.EventuallyTimeout).Should(Succeed())
+               Eventually(func() (map[string]*databasev1.Node, error) {
+                       return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+               }, flags.EventuallyTimeout).Should(HaveLen(1))
+               closeFn()
+               Eventually(func() (map[string]*databasev1.Node, error) {
+                       return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+               }, flags.EventuallyTimeout).Should(BeNil())
+       })
+       It("should register/unregister a data node successfully", func() {
+               namespace := "data-test"
+               nodeHost := "data-1"
+               ports, err := test.AllocateFreePorts(1)
+               Expect(err).NotTo(HaveOccurred())
+               addr := fmt.Sprintf("%s:%d", host, ports[0])
+               closeFn := setup.CMD("data",
+                       "--namespace", namespace,
+                       "--grpc-host="+host,
+                       fmt.Sprintf("--grpc-port=%d", ports[0]),
+                       "--etcd-endpoints", etcdEndpoint,
+                       "--node-host-provider", "flag",
+                       "--node-host", nodeHost)
+               Eventually(
+                       helpers.HealthCheck(addr, 10*time.Second, 
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),
+                       flags.EventuallyTimeout).Should(Succeed())
+               Eventually(func() (map[string]*databasev1.Node, error) {
+                       return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+               }, flags.EventuallyTimeout).Should(HaveLen(1))
+               closeFn()
+               Eventually(func() (map[string]*databasev1.Node, error) {
+                       return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", namespace, nodeHost, ports[0]))
+               }, flags.EventuallyTimeout).Should(BeNil())
+       })
+})
diff --git a/banyand/metadata/schema/schema_suite_test.go 
b/test/integration/distributed/setup/setup_suite_test.go
similarity index 51%
copy from banyand/metadata/schema/schema_suite_test.go
copy to test/integration/distributed/setup/setup_suite_test.go
index 383a4f54..3a51ab8c 100644
--- a/banyand/metadata/schema/schema_suite_test.go
+++ b/test/integration/distributed/setup/setup_suite_test.go
@@ -15,14 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package schema
+// Package integration_setup_test is a integration test suite.
+package integration_setup_test
 
 import (
        "fmt"
        "testing"
 
-       "github.com/onsi/ginkgo/v2"
-       "github.com/onsi/gomega"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
 
        "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -30,38 +32,44 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
-func TestSchema(t *testing.T) {
-       gomega.RegisterFailHandler(ginkgo.Fail)
-       ginkgo.RunSpecs(t, "Schema Suite")
+func TestSetup(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Setup Suite")
 }
 
 var (
-       server   embeddedetcd.Server
-       registry *etcdSchemaRegistry
+       deferFunc    func()
+       goods        []gleak.Goroutine
+       etcdEndpoint string
 )
 
-var _ = ginkgo.BeforeSuite(func() {
-       gomega.Expect(logger.Init(logger.Logging{
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
                Env:   "dev",
                Level: flags.LogLevel,
-       })).To(gomega.Succeed())
+       })).To(Succeed())
+       goods = gleak.Goroutines()
        ports, err := test.AllocateFreePorts(2)
-       if err != nil {
-               panic("fail to find free ports")
-       }
-       endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d";, ports[0])}
-       server, err = embeddedetcd.NewServer(
-               embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
-               embeddedetcd.RootDir(randomTempDir()))
-       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+       Expect(err).NotTo(HaveOccurred())
+       dir, spaceDef, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(dir))
+       Expect(err).ShouldNot(HaveOccurred())
        <-server.ReadyNotify()
-       schemaRegistry, err := 
NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints))
-       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
-       registry = schemaRegistry.(*etcdSchemaRegistry)
+       deferFunc = func() {
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+       return []byte(ep)
+}, func(ep []byte) {
+       etcdEndpoint = string(ep)
 })
 
-var _ = ginkgo.AfterSuite(func() {
-       registry.Close()
-       server.Close()
-       <-server.StopNotify()
+var _ = SynchronizedAfterSuite(func() {}, func() {
+       deferFunc()
+       Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
 })
diff --git a/test/integration/cold_query/query_suite_test.go 
b/test/integration/standalone/cold_query/query_suite_test.go
similarity index 96%
rename from test/integration/cold_query/query_suite_test.go
rename to test/integration/standalone/cold_query/query_suite_test.go
index 9f3c16df..e595e1a5 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/standalone/cold_query/query_suite_test.go
@@ -38,11 +38,12 @@ import (
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
        casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
 )
 
 func TestIntegrationColdQuery(t *testing.T) {
        RegisterFailHandler(Fail)
-       RunSpecs(t, "Integration Query Cold Data Suite", Label("integration"))
+       RunSpecs(t, "Integration Query Cold Data Suite", 
Label(integration_standalone.Labels...))
 }
 
 var (
diff --git a/test/integration/other/measure_test.go 
b/test/integration/standalone/other/measure_test.go
similarity index 100%
rename from test/integration/other/measure_test.go
rename to test/integration/standalone/other/measure_test.go
diff --git a/test/integration/other/other_suite_test.go 
b/test/integration/standalone/other/other_suite_test.go
similarity index 87%
rename from test/integration/other/other_suite_test.go
rename to test/integration/standalone/other/other_suite_test.go
index b073fc86..c86e9d75 100644
--- a/test/integration/other/other_suite_test.go
+++ b/test/integration/standalone/other/other_suite_test.go
@@ -25,11 +25,12 @@ import (
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
 )
 
 func TestIntegrationOther(t *testing.T) {
        gm.RegisterFailHandler(g.Fail)
-       g.RunSpecs(t, "Integration Other Suite", g.Label("integration"))
+       g.RunSpecs(t, "Integration Other Suite", 
g.Label(integration_standalone.Labels...))
 }
 
 var _ = g.BeforeSuite(func() {
diff --git a/test/integration/other/property_test.go 
b/test/integration/standalone/other/property_test.go
similarity index 100%
rename from test/integration/other/property_test.go
rename to test/integration/standalone/other/property_test.go
diff --git a/test/integration/other/testdata/server_cert.pem 
b/test/integration/standalone/other/testdata/server_cert.pem
similarity index 100%
rename from test/integration/other/testdata/server_cert.pem
rename to test/integration/standalone/other/testdata/server_cert.pem
diff --git a/test/integration/other/testdata/server_key.pem 
b/test/integration/standalone/other/testdata/server_key.pem
similarity index 100%
rename from test/integration/other/testdata/server_key.pem
rename to test/integration/standalone/other/testdata/server_key.pem
diff --git a/test/integration/other/tls_test.go 
b/test/integration/standalone/other/tls_test.go
similarity index 100%
rename from test/integration/other/tls_test.go
rename to test/integration/standalone/other/tls_test.go
diff --git a/test/integration/query/query_suite_test.go 
b/test/integration/standalone/query/query_suite_test.go
similarity index 96%
rename from test/integration/query/query_suite_test.go
rename to test/integration/standalone/query/query_suite_test.go
index 0ebed247..b9852cbb 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -38,11 +38,12 @@ import (
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
        casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
 )
 
 func TestIntegrationQuery(t *testing.T) {
        RegisterFailHandler(Fail)
-       RunSpecs(t, "Integration Query Suite", Label("integration"))
+       RunSpecs(t, "Integration Query Suite", 
Label(integration_standalone.Labels...))
 }
 
 var (
diff --git a/banyand/cmd/server/main.go 
b/test/integration/standalone/standalone.go
similarity index 73%
copy from banyand/cmd/server/main.go
copy to test/integration/standalone/standalone.go
index 8906f84d..42f20c7a 100644
--- a/banyand/cmd/server/main.go
+++ b/test/integration/standalone/standalone.go
@@ -15,19 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package main implements the executable banyandb server named banyand.
-package main
+// Package standalone is a standalone integration test suite.
+package standalone
 
-import (
-       "fmt"
-       "os"
-
-       "github.com/apache/skywalking-banyandb/banyand/internal/cmd"
-)
-
-func main() {
-       if err := cmd.NewRoot().Execute(); err != nil {
-               _, _ = fmt.Fprintln(os.Stderr, err)
-               os.Exit(1)
-       }
-}
+// Labels is the labels for standalone integration test suite.
+var Labels = []string{"integration", "standalone"}

Reply via email to