This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 42c10e61 Support multiple roles for banyand (#305)
42c10e61 is described below

commit 42c10e617b9f9e2b43840da8a9f3b5cc6825bfb0
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jul 26 11:59:00 2023 +0800

    Support multiple roles for banyand (#305)
    
    * Support multiple roles for banyand
    
    * Separate etcd server from the schema registry
    
    
    ---------
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 CHANGES.md                                         |   1 +
 banyand/internal/cmd/{standalone.go => liaison.go} |  52 ++----
 banyand/internal/cmd/meta.go                       |  87 ++++++++++
 banyand/internal/cmd/root.go                       |   3 +
 banyand/internal/cmd/standalone.go                 |  14 +-
 banyand/internal/cmd/{standalone.go => storage.go} |  69 +++++---
 banyand/liaison/grpc/property.go                   |   2 +-
 banyand/liaison/grpc/registry.go                   |  12 +-
 banyand/liaison/grpc/server.go                     |   2 +-
 banyand/liaison/liaison.go                         |   2 +-
 banyand/metadata/{metadata.go => client.go}        |  96 ++++-------
 banyand/metadata/embeddedetcd/server.go            | 159 ++++++++++++++++++
 banyand/metadata/metadata.go                       | 181 ---------------------
 banyand/metadata/metadata_test.go                  |   1 +
 banyand/metadata/schema/etcd.go                    |  86 ++--------
 banyand/metadata/schema/etcd_test.go               |  73 ++++-----
 banyand/metadata/schema/schema.go                  |   3 -
 banyand/metadata/server.go                         |  95 +++++++++++
 banyand/query/processor.go                         |   4 +-
 banyand/query/query.go                             |   4 +-
 docs/concept/clustering.md                         |   8 +
 docs/installation.md                               |  83 +++++-----
 pkg/run/run.go                                     |  36 ++++
 pkg/test/setup/setup.go                            |   2 +-
 24 files changed, 588 insertions(+), 487 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index c364de20..338d9c6b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
 - List all properties in a group.
 - Implement Write-ahead Logging
 - Document the clustering.
+- Support multiple roles for banyand server.
 
 ### Bugs
 
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/liaison.go
similarity index 63%
copy from banyand/internal/cmd/standalone.go
copy to banyand/internal/cmd/liaison.go
index 7a91cd77..6c883468 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/liaison.go
@@ -27,12 +27,9 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/liaison"
        "github.com/apache/skywalking-banyandb/banyand/liaison/http"
-       "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
-       "github.com/apache/skywalking-banyandb/banyand/query"
        "github.com/apache/skywalking-banyandb/banyand/queue"
-       "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/config"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -40,35 +37,24 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-var g = run.NewGroup("standalone")
+var liaisonGroup = run.NewGroup("liaison")
 
-func newStandaloneCmd() *cobra.Command {
+func newLiaisonCmd() *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
        repo, err := discovery.NewServiceRepo(ctx)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate service repository")
        }
+       // nolint: staticcheck
        pipeline, err := queue.NewQueue(ctx, repo)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate data pipeline")
        }
-       metaSvc, err := metadata.NewService(ctx)
+       metaSvc, err := metadata.NewClient(ctx)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
-       streamSvc, err := stream.NewService(ctx, metaSvc, repo, pipeline)
-       if err != nil {
-               l.Fatal().Err(err).Msg("failed to initiate stream service")
-       }
-       measureSvc, err := measure.NewService(ctx, metaSvc, repo, pipeline)
-       if err != nil {
-               l.Fatal().Err(err).Msg("failed to initiate measure service")
-       }
-       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, repo, 
pipeline)
-       if err != nil {
-               l.Fatal().Err(err).Msg("failed to initiate query processor")
-       }
        tcp, err := liaison.NewEndpoint(ctx, pipeline, repo, metaSvc)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate Endpoint transport 
layer")
@@ -81,10 +67,6 @@ func newStandaloneCmd() *cobra.Command {
                new(signal.Handler),
                repo,
                pipeline,
-               metaSvc,
-               measureSvc,
-               streamSvc,
-               q,
                tcp,
                httpServer,
                profSvc,
@@ -93,12 +75,12 @@ func newStandaloneCmd() *cobra.Command {
                units = append(units, metricSvc)
        }
        // Meta the run Group units.
-       g.Register(units...)
+       liaisonGroup.Register(units...)
        logging := logger.Logging{}
-       standaloneCmd := &cobra.Command{
-               Use:     "standalone",
+       liaisonCmd := &cobra.Command{
+               Use:     "liaison",
                Version: version.Build(),
-               Short:   "Run as the standalone mode",
+               Short:   "Run as the liaison server",
                PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
                        if err = config.Load("logging", cmd.Flags()); err != 
nil {
                                return err
@@ -107,20 +89,20 @@ func newStandaloneCmd() *cobra.Command {
                },
                RunE: func(cmd *cobra.Command, args []string) (err error) {
                        fmt.Print(logo)
-                       logger.GetLogger().Info().Msg("starting as a standalone 
server")
+                       logger.GetLogger().Info().Msg("starting as a liaison 
server")
                        // Spawn our go routines and wait for shutdown.
-                       if err := g.Run(); err != nil {
-                               
logger.GetLogger().Error().Err(err).Stack().Str("name", g.Name()).Msg("Exit")
+                       if err := liaisonGroup.Run(); err != nil {
+                               
logger.GetLogger().Error().Err(err).Stack().Str("name", 
liaisonGroup.Name()).Msg("Exit")
                                os.Exit(-1)
                        }
                        return nil
                },
        }
 
-       standaloneCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", 
"the logging")
-       standaloneCmd.Flags().StringVar(&logging.Level, "logging-level", 
"info", "the root level of logging")
-       standaloneCmd.Flags().StringArrayVar(&logging.Modules, 
"logging-modules", nil, "the specific module")
-       standaloneCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", 
nil, "the level logging of logging")
-       standaloneCmd.Flags().AddFlagSet(g.RegisterFlags().FlagSet)
-       return standaloneCmd
+       liaisonCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the 
logging")
+       liaisonCmd.Flags().StringVar(&logging.Level, "logging-level", "info", 
"the root level of logging")
+       liaisonCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", 
nil, "the specific module")
+       liaisonCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", 
nil, "the level logging of logging")
+       liaisonCmd.Flags().AddFlagSet(liaisonGroup.RegisterFlags().FlagSet)
+       return liaisonCmd
 }
diff --git a/banyand/internal/cmd/meta.go b/banyand/internal/cmd/meta.go
new file mode 100644
index 00000000..df7530d2
--- /dev/null
+++ b/banyand/internal/cmd/meta.go
@@ -0,0 +1,87 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cmd
+
+import (
+       "context"
+       "fmt"
+       "os"
+
+       "github.com/spf13/cobra"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/config"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/signal"
+       "github.com/apache/skywalking-banyandb/pkg/version"
+)
+
+var metaGroup = run.NewGroup("meta")
+
+func newMetaCmd() *cobra.Command {
+       l := logger.GetLogger("bootstrap")
+       ctx := context.Background()
+       metaSvc, err := metadata.NewService(ctx)
+       if err != nil {
+               l.Fatal().Err(err).Msg("failed to initiate metadata service")
+       }
+       profSvc := observability.NewProfService()
+       metricSvc := observability.NewMetricService()
+
+       units := []run.Unit{
+               new(signal.Handler),
+               metaSvc,
+               profSvc,
+       }
+       if metricSvc != nil {
+               units = append(units, metricSvc)
+       }
+       // Meta the run Group units.
+       metaGroup.Register(units...)
+       logging := logger.Logging{}
+       metaCmd := &cobra.Command{
+               Use:     "meta",
+               Version: version.Build(),
+               Short:   "Run as the meta server",
+               PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
+                       if err = config.Load("logging", cmd.Flags()); err != 
nil {
+                               return err
+                       }
+                       return logger.Init(logging)
+               },
+               RunE: func(cmd *cobra.Command, args []string) (err error) {
+                       fmt.Print(logo)
+                       logger.GetLogger().Info().Msg("starting as a meta 
server")
+                       // Spawn our go routines and wait for shutdown.
+                       if err := metaGroup.Run(); err != nil {
+                               
logger.GetLogger().Error().Err(err).Stack().Str("name", 
metaGroup.Name()).Msg("Exit")
+                               os.Exit(-1)
+                       }
+                       return nil
+               },
+       }
+
+       metaCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the 
logging")
+       metaCmd.Flags().StringVar(&logging.Level, "logging-level", "info", "the 
root level of logging")
+       metaCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", 
nil, "the specific module")
+       metaCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", nil, 
"the level logging of logging")
+       metaCmd.Flags().AddFlagSet(metaGroup.RegisterFlags().FlagSet)
+       return metaCmd
+}
diff --git a/banyand/internal/cmd/root.go b/banyand/internal/cmd/root.go
index c5dd966a..da67766d 100644
--- a/banyand/internal/cmd/root.go
+++ b/banyand/internal/cmd/root.go
@@ -44,5 +44,8 @@ BanyanDB, as an observability database, aims to ingest, 
analyze and store Metric
 `,
        }
        cmd.AddCommand(newStandaloneCmd())
+       cmd.AddCommand(newMetaCmd())
+       cmd.AddCommand(newStorageCmd())
+       cmd.AddCommand(newLiaisonCmd())
        return cmd
 }
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/standalone.go
index 7a91cd77..027e0407 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -40,7 +40,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-var g = run.NewGroup("standalone")
+var standaloneGroup = run.NewGroup("standalone")
 
 func newStandaloneCmd() *cobra.Command {
        l := logger.GetLogger("bootstrap")
@@ -65,7 +65,7 @@ func newStandaloneCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
-       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, repo, 
pipeline)
+       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, 
pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
@@ -93,12 +93,12 @@ func newStandaloneCmd() *cobra.Command {
                units = append(units, metricSvc)
        }
        // Meta the run Group units.
-       g.Register(units...)
+       standaloneGroup.Register(units...)
        logging := logger.Logging{}
        standaloneCmd := &cobra.Command{
                Use:     "standalone",
                Version: version.Build(),
-               Short:   "Run as the standalone mode",
+               Short:   "Run as the standalone server",
                PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
                        if err = config.Load("logging", cmd.Flags()); err != 
nil {
                                return err
@@ -109,8 +109,8 @@ func newStandaloneCmd() *cobra.Command {
                        fmt.Print(logo)
                        logger.GetLogger().Info().Msg("starting as a standalone 
server")
                        // Spawn our go routines and wait for shutdown.
-                       if err := g.Run(); err != nil {
-                               
logger.GetLogger().Error().Err(err).Stack().Str("name", g.Name()).Msg("Exit")
+                       if err := standaloneGroup.Run(); err != nil {
+                               
logger.GetLogger().Error().Err(err).Stack().Str("name", 
standaloneGroup.Name()).Msg("Exit")
                                os.Exit(-1)
                        }
                        return nil
@@ -121,6 +121,6 @@ func newStandaloneCmd() *cobra.Command {
        standaloneCmd.Flags().StringVar(&logging.Level, "logging-level", 
"info", "the root level of logging")
        standaloneCmd.Flags().StringArrayVar(&logging.Modules, 
"logging-modules", nil, "the specific module")
        standaloneCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", 
nil, "the level logging of logging")
-       standaloneCmd.Flags().AddFlagSet(g.RegisterFlags().FlagSet)
+       
standaloneCmd.Flags().AddFlagSet(standaloneGroup.RegisterFlags().FlagSet)
        return standaloneCmd
 }
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/storage.go
similarity index 64%
copy from banyand/internal/cmd/standalone.go
copy to banyand/internal/cmd/storage.go
index 7a91cd77..80c3311b 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/storage.go
@@ -25,8 +25,6 @@ import (
        "github.com/spf13/cobra"
 
        "github.com/apache/skywalking-banyandb/banyand/discovery"
-       "github.com/apache/skywalking-banyandb/banyand/liaison"
-       "github.com/apache/skywalking-banyandb/banyand/liaison/http"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -40,20 +38,29 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/version"
 )
 
-var g = run.NewGroup("standalone")
+var storageGroup = run.NewGroup("storage")
 
-func newStandaloneCmd() *cobra.Command {
+const (
+       storageModeData  = "data"
+       storageModeQuery = "query"
+       storageModeMix   = "mix"
+)
+
+var flagStorageMode string
+
+func newStorageCmd() *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
        repo, err := discovery.NewServiceRepo(ctx)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate service repository")
        }
+       // nolint: staticcheck
        pipeline, err := queue.NewQueue(ctx, repo)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate data pipeline")
        }
-       metaSvc, err := metadata.NewService(ctx)
+       metaSvc, err := metadata.NewClient(ctx)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
@@ -65,62 +72,70 @@ func newStandaloneCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
-       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, repo, 
pipeline)
+       // TODO: remove streamSVC and measureSvc from query processor. To use 
metaSvc instead.
+       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, 
pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
-       tcp, err := liaison.NewEndpoint(ctx, pipeline, repo, metaSvc)
-       if err != nil {
-               l.Fatal().Err(err).Msg("failed to initiate Endpoint transport 
layer")
-       }
        profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService()
-       httpServer := http.NewService()
 
        units := []run.Unit{
                new(signal.Handler),
                repo,
                pipeline,
-               metaSvc,
                measureSvc,
                streamSvc,
                q,
-               tcp,
-               httpServer,
                profSvc,
        }
        if metricSvc != nil {
                units = append(units, metricSvc)
        }
        // Meta the run Group units.
-       g.Register(units...)
+       storageGroup.Register(units...)
        logging := logger.Logging{}
-       standaloneCmd := &cobra.Command{
-               Use:     "standalone",
+       storageCmd := &cobra.Command{
+               Use:     "storage",
                Version: version.Build(),
-               Short:   "Run as the standalone mode",
+               Short:   "Run as the storage server",
                PersistentPreRunE: func(cmd *cobra.Command, args []string) (err 
error) {
                        if err = config.Load("logging", cmd.Flags()); err != 
nil {
                                return err
                        }
                        return logger.Init(logging)
                },
+               PreRun: func(cmd *cobra.Command, args []string) {
+                       if flagStorageMode == storageModeMix {
+                               return
+                       }
+                       switch flagStorageMode {
+                       case storageModeData:
+                               storageGroup.Deregister(q)
+                       case storageModeQuery:
+                               storageGroup.Deregister(streamSvc)
+                               storageGroup.Deregister(measureSvc)
+                       default:
+                               l.Fatal().Str("mode", 
flagStorageMode).Msg("unknown storage mode")
+                       }
+               },
                RunE: func(cmd *cobra.Command, args []string) (err error) {
                        fmt.Print(logo)
-                       logger.GetLogger().Info().Msg("starting as a standalone 
server")
+                       logger.GetLogger().Info().Msg("starting as a storage 
server")
                        // Spawn our go routines and wait for shutdown.
-                       if err := g.Run(); err != nil {
-                               
logger.GetLogger().Error().Err(err).Stack().Str("name", g.Name()).Msg("Exit")
+                       if err := storageGroup.Run(); err != nil {
+                               
logger.GetLogger().Error().Err(err).Stack().Str("name", 
storageGroup.Name()).Msg("Exit")
                                os.Exit(-1)
                        }
                        return nil
                },
        }
 
-       standaloneCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", 
"the logging")
-       standaloneCmd.Flags().StringVar(&logging.Level, "logging-level", 
"info", "the root level of logging")
-       standaloneCmd.Flags().StringArrayVar(&logging.Modules, 
"logging-modules", nil, "the specific module")
-       standaloneCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", 
nil, "the level logging of logging")
-       standaloneCmd.Flags().AddFlagSet(g.RegisterFlags().FlagSet)
-       return standaloneCmd
+       storageCmd.Flags().StringVar(&logging.Env, "logging-env", "prod", "the 
logging")
+       storageCmd.Flags().StringVar(&logging.Level, "logging-level", "info", 
"the root level of logging")
+       storageCmd.Flags().StringArrayVar(&logging.Modules, "logging-modules", 
nil, "the specific module")
+       storageCmd.Flags().StringArrayVar(&logging.Levels, "logging-levels", 
nil, "the level logging of logging")
+       storageCmd.Flags().StringVarP(&flagStorageMode, "mode", "m", 
storageModeMix, "the storage mode, one of [data, query, mix]")
+       storageCmd.Flags().AddFlagSet(storageGroup.RegisterFlags().FlagSet)
+       return storageCmd
 }
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index 51b0eba7..05a542f8 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -26,7 +26,7 @@ import (
 
 type propertyServer struct {
        propertyv1.UnimplementedPropertyServiceServer
-       schemaRegistry metadata.Service
+       schemaRegistry metadata.Repo
 }
 
 func (ps *propertyServer) Apply(ctx context.Context, req 
*propertyv1.ApplyRequest) (*propertyv1.ApplyResponse, error) {
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 87927e7f..93e49138 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -32,7 +32,7 @@ import (
 
 type streamRegistryServer struct {
        databasev1.UnimplementedStreamRegistryServiceServer
-       schemaRegistry metadata.Service
+       schemaRegistry metadata.Repo
 }
 
 func (rs *streamRegistryServer) Create(ctx context.Context,
@@ -123,7 +123,7 @@ func groupExist(ctx context.Context, errResource error, 
metadata *commonv1.Metad
 
 type indexRuleBindingRegistryServer struct {
        databasev1.UnimplementedIndexRuleBindingRegistryServiceServer
-       schemaRegistry metadata.Service
+       schemaRegistry metadata.Repo
 }
 
 func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
@@ -205,7 +205,7 @@ func (rs *indexRuleBindingRegistryServer) Exist(ctx 
context.Context, req *databa
 
 type indexRuleRegistryServer struct {
        databasev1.UnimplementedIndexRuleRegistryServiceServer
-       schemaRegistry metadata.Service
+       schemaRegistry metadata.Repo
 }
 
 func (rs *indexRuleRegistryServer) Create(ctx context.Context, req 
*databasev1.IndexRuleRegistryServiceCreateRequest) (
@@ -281,7 +281,7 @@ func (rs *indexRuleRegistryServer) Exist(ctx 
context.Context, req *databasev1.In
 
 type measureRegistryServer struct {
        databasev1.UnimplementedMeasureRegistryServiceServer
-       schemaRegistry metadata.Service
+       schemaRegistry metadata.Repo
 }
 
 func (rs *measureRegistryServer) Create(ctx context.Context, req 
*databasev1.MeasureRegistryServiceCreateRequest) (
@@ -355,7 +355,7 @@ func (rs *measureRegistryServer) Exist(ctx context.Context, 
req *databasev1.Meas
 
 type groupRegistryServer struct {
        databasev1.UnimplementedGroupRegistryServiceServer
-       schemaRegistry metadata.Service
+       schemaRegistry metadata.Repo
 }
 
 func (rs *groupRegistryServer) Create(ctx context.Context, req 
*databasev1.GroupRegistryServiceCreateRequest) (
@@ -429,7 +429,7 @@ func (rs *groupRegistryServer) Exist(ctx context.Context, 
req *databasev1.GroupR
 
 type topNAggregationRegistryServer struct {
        databasev1.UnimplementedTopNAggregationRegistryServiceServer
-       schemaRegistry metadata.Service
+       schemaRegistry metadata.Repo
 }
 
 func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index a92ff7a6..9ead16c3 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -85,7 +85,7 @@ type server struct {
 }
 
 // NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Service) run.Unit {
+func NewServer(_ context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Repo) run.Unit {
        streamSVC := &streamService{
                discoveryService: newDiscoveryService(pipeline),
        }
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index bb59f228..539d2f45 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -29,6 +29,6 @@ import (
 )
 
 // NewEndpoint return a new endpoint which is the entry point for the database 
server.
-func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Service) (run.Unit, error) {
+func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Repo) (run.Unit, error) {
        return grpc.NewServer(ctx, pipeline, repo, schemaRegistry), nil
 }
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/client.go
similarity index 57%
copy from banyand/metadata/metadata.go
copy to banyand/metadata/client.go
index 86697a8a..cfcfec84 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/client.go
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package metadata implements a Raft-based distributed metadata storage 
system.
-// Powered by etcd.
 package metadata
 
 import (
@@ -32,120 +30,88 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-// IndexFilter provides methods to find a specific index related objects and 
vice versa.
-type IndexFilter interface {
-       // IndexRules fetches v1.IndexRule by subject defined in 
IndexRuleBinding
-       IndexRules(ctx context.Context, subject *commonv1.Metadata) 
([]*databasev1.IndexRule, error)
-       // Subjects fetches Subject(s) by index rule
-       Subjects(ctx context.Context, indexRule *databasev1.IndexRule, catalog 
commonv1.Catalog) ([]schema.Spec, error)
-}
-
-// Repo is the facade to interact with the metadata repository.
-type Repo interface {
-       IndexFilter
-       StreamRegistry() schema.Stream
-       IndexRuleRegistry() schema.IndexRule
-       IndexRuleBindingRegistry() schema.IndexRuleBinding
-       MeasureRegistry() schema.Measure
-       GroupRegistry() schema.Group
-       TopNAggregationRegistry() schema.TopNAggregation
-       PropertyRegistry() schema.Property
-}
+const flagEtcdEndpointsName = "etcd-endpoints"
 
-// Service is the metadata repository.
-type Service interface {
-       Repo
-       run.PreRunner
-       run.Service
-       run.Config
-       SchemaRegistry() schema.Registry
+// NewClient returns a new metadata client.
+func NewClient(_ context.Context) (Service, error) {
+       return &clientService{closer: run.NewCloser(1)}, nil
 }
 
-type service struct {
-       schemaRegistry  schema.Registry
-       rootDir         string
-       listenClientURL string
-       listenPeerURL   string
+type clientService struct {
+       schemaRegistry schema.Registry
+       closer         *run.Closer
+       endpoints      []string
 }
 
-func (s *service) FlagSet() *run.FlagSet {
+func (s *clientService) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("metadata")
-       fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path 
of metadata")
-       fs.StringVar(&s.listenClientURL, "etcd-listen-client-url", 
"http://localhost:2379";, "A URL to listen on for client traffic")
-       fs.StringVar(&s.listenPeerURL, "etcd-listen-peer-url", 
"http://localhost:2380";, "A URL to listen on for peer traffic")
+       fs.StringArrayVar(&s.endpoints, flagEtcdEndpointsName, 
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
        return fs
 }
 
-func (s *service) Validate() error {
-       if s.rootDir == "" {
-               return errors.New("rootDir is empty")
+func (s *clientService) Validate() error {
+       if s.endpoints == nil {
+               return errors.New("endpoints is empty")
        }
        return nil
 }
 
-func (s *service) PreRun() error {
+func (s *clientService) PreRun() error {
        var err error
-       s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
-               schema.ConfigureListener(s.listenClientURL, s.listenPeerURL),
-               schema.RootDir(s.rootDir))
+       s.schemaRegistry, err = 
schema.NewEtcdSchemaRegistry(schema.ConfigureServerEndpoints(s.endpoints))
        if err != nil {
                return err
        }
-       <-s.schemaRegistry.ReadyNotify()
        return nil
 }
 
-func (s *service) Serve() run.StopNotify {
-       return s.schemaRegistry.StoppingNotify()
+func (s *clientService) Serve() run.StopNotify {
+       return s.closer.CloseNotify()
 }
 
-func (s *service) GracefulStop() {
+func (s *clientService) GracefulStop() {
+       s.closer.Done()
+       s.closer.CloseThenWait()
        _ = s.schemaRegistry.Close()
-       <-s.schemaRegistry.StopNotify()
-}
-
-// NewService returns a new metadata repository Service.
-func NewService(_ context.Context) (Service, error) {
-       return &service{}, nil
 }
 
-func (s *service) SchemaRegistry() schema.Registry {
+func (s *clientService) SchemaRegistry() schema.Registry {
        return s.schemaRegistry
 }
 
-func (s *service) StreamRegistry() schema.Stream {
+func (s *clientService) StreamRegistry() schema.Stream {
        return s.schemaRegistry
 }
 
-func (s *service) IndexRuleRegistry() schema.IndexRule {
+func (s *clientService) IndexRuleRegistry() schema.IndexRule {
        return s.schemaRegistry
 }
 
-func (s *service) IndexRuleBindingRegistry() schema.IndexRuleBinding {
+func (s *clientService) IndexRuleBindingRegistry() schema.IndexRuleBinding {
        return s.schemaRegistry
 }
 
-func (s *service) MeasureRegistry() schema.Measure {
+func (s *clientService) MeasureRegistry() schema.Measure {
        return s.schemaRegistry
 }
 
-func (s *service) GroupRegistry() schema.Group {
+func (s *clientService) GroupRegistry() schema.Group {
        return s.schemaRegistry
 }
 
-func (s *service) TopNAggregationRegistry() schema.TopNAggregation {
+func (s *clientService) TopNAggregationRegistry() schema.TopNAggregation {
        return s.schemaRegistry
 }
 
-func (s *service) PropertyRegistry() schema.Property {
+func (s *clientService) PropertyRegistry() schema.Property {
        return s.schemaRegistry
 }
 
-func (s *service) Name() string {
+func (s *clientService) Name() string {
        return "metadata"
 }
 
-func (s *service) IndexRules(ctx context.Context, subject *commonv1.Metadata) 
([]*databasev1.IndexRule, error) {
+func (s *clientService) IndexRules(ctx context.Context, subject 
*commonv1.Metadata) ([]*databasev1.IndexRule, error) {
        bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, 
schema.ListOpt{Group: subject.Group})
        if err != nil {
                return nil, err
@@ -179,7 +145,7 @@ func (s *service) IndexRules(ctx context.Context, subject 
*commonv1.Metadata) ([
        return result, indexRuleErr
 }
 
-func (s *service) Subjects(ctx context.Context, indexRule 
*databasev1.IndexRule, catalog commonv1.Catalog) ([]schema.Spec, error) {
+func (s *clientService) Subjects(ctx context.Context, indexRule 
*databasev1.IndexRule, catalog commonv1.Catalog) ([]schema.Spec, error) {
        bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, 
schema.ListOpt{Group: indexRule.GetMetadata().GetGroup()})
        if err != nil {
                return nil, err
diff --git a/banyand/metadata/embeddedetcd/server.go 
b/banyand/metadata/embeddedetcd/server.go
new file mode 100644
index 00000000..d6ac0cb5
--- /dev/null
+++ b/banyand/metadata/embeddedetcd/server.go
@@ -0,0 +1,159 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package embeddedetcd implements an embedded etcd server.
+package embeddedetcd
+
+import (
+       "io"
+       "net/url"
+       "os"
+       "path/filepath"
+       "time"
+
+       "go.etcd.io/etcd/server/v3/embed"
+       "go.uber.org/zap"
+
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// Server is the interface of etcd server.
+type Server interface {
+       io.Closer
+       ReadyNotify() <-chan struct{}
+       StopNotify() <-chan struct{}
+       StoppingNotify() <-chan struct{}
+}
+
+type server struct {
+       server *embed.Etcd
+}
+
+// Option is the option for etcd server.
+type Option func(*config)
+
+// RootDir sets the root directory of Registry.
+func RootDir(rootDir string) Option {
+       return func(config *config) {
+               config.rootDir = rootDir
+       }
+}
+
+// ConfigureListener sets peer urls of listeners.
+func ConfigureListener(lcs, lps []string) Option {
+       return func(config *config) {
+               config.listenerClientURLs = lcs
+               config.listenerPeerURLs = lps
+       }
+}
+
+type config struct {
+       // rootDir is the root directory for etcd storage
+       rootDir string
+       // listenerClientURLs is the listener for client
+       listenerClientURLs []string
+       // listenerPeerURLs is the listener for peer
+       listenerPeerURLs []string
+}
+
+func (e *server) ReadyNotify() <-chan struct{} {
+       return e.server.Server.ReadyNotify()
+}
+
+func (e *server) StopNotify() <-chan struct{} {
+       return e.server.Server.StopNotify()
+}
+
+func (e *server) StoppingNotify() <-chan struct{} {
+       return e.server.Server.StoppingNotify()
+}
+
+func (e *server) Close() error {
+       e.server.Close()
+       return nil
+}
+
+// NewServer returns a new etcd server.
+func NewServer(options ...Option) (Server, error) {
+       conf := &config{
+               rootDir:            os.TempDir(),
+               listenerClientURLs: []string{embed.DefaultListenClientURLs},
+               listenerPeerURLs:   []string{embed.DefaultListenPeerURLs},
+       }
+       for _, opt := range options {
+               opt(conf)
+       }
+       zapCfg := logger.GetLogger("etcd").ToZapConfig()
+
+       var l *zap.Logger
+       var err error
+       if l, err = zapCfg.Build(); err != nil {
+               return nil, err
+       }
+       // TODO: allow use cluster setting
+       embedConfig, err := newEmbedEtcdConfig(conf, l)
+       if err != nil {
+               return nil, err
+       }
+       e, err := embed.StartEtcd(embedConfig)
+       if err != nil {
+               return nil, err
+       }
+       if e != nil {
+               <-e.Server.ReadyNotify() // wait for e.Server to join the 
cluster
+       }
+       reg := &server{
+               server: e,
+       }
+       return reg, nil
+}
+
+func newEmbedEtcdConfig(config *config, logger *zap.Logger) (*embed.Config, 
error) {
+       cfg := embed.NewConfig()
+       cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger)
+       cfg.Dir = filepath.Join(config.rootDir, "metadata")
+       observability.UpdatePath(cfg.Dir)
+       parseURLs := func(urls []string) ([]url.URL, error) {
+               uu := make([]url.URL, 0, len(urls))
+               for _, u := range urls {
+                       cURL, err := url.Parse(u)
+                       if err != nil {
+                               return nil, err
+                       }
+                       uu = append(uu, *cURL)
+               }
+               return uu, nil
+       }
+       cURLs, err := parseURLs(config.listenerClientURLs)
+       if err != nil {
+               return nil, err
+       }
+       pURLs, err := parseURLs(config.listenerPeerURLs)
+       if err != nil {
+               return nil, err
+       }
+       cfg.Name = "meta"
+       cfg.ListenClientUrls, cfg.AdvertiseClientUrls = cURLs, cURLs
+       cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = pURLs, pURLs
+       cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
+
+       cfg.BackendBatchInterval = 500 * time.Millisecond
+       cfg.BackendBatchLimit = 10000
+       cfg.MaxRequestBytes = 10 * 1024 * 1024
+       return cfg, nil
+}
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 86697a8a..f9ab33d7 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -21,10 +21,6 @@ package metadata
 
 import (
        "context"
-       "errors"
-       "time"
-
-       "go.uber.org/multierr"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -60,180 +56,3 @@ type Service interface {
        run.Config
        SchemaRegistry() schema.Registry
 }
-
-type service struct {
-       schemaRegistry  schema.Registry
-       rootDir         string
-       listenClientURL string
-       listenPeerURL   string
-}
-
-func (s *service) FlagSet() *run.FlagSet {
-       fs := run.NewFlagSet("metadata")
-       fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path 
of metadata")
-       fs.StringVar(&s.listenClientURL, "etcd-listen-client-url", 
"http://localhost:2379";, "A URL to listen on for client traffic")
-       fs.StringVar(&s.listenPeerURL, "etcd-listen-peer-url", 
"http://localhost:2380";, "A URL to listen on for peer traffic")
-       return fs
-}
-
-func (s *service) Validate() error {
-       if s.rootDir == "" {
-               return errors.New("rootDir is empty")
-       }
-       return nil
-}
-
-func (s *service) PreRun() error {
-       var err error
-       s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
-               schema.ConfigureListener(s.listenClientURL, s.listenPeerURL),
-               schema.RootDir(s.rootDir))
-       if err != nil {
-               return err
-       }
-       <-s.schemaRegistry.ReadyNotify()
-       return nil
-}
-
-func (s *service) Serve() run.StopNotify {
-       return s.schemaRegistry.StoppingNotify()
-}
-
-func (s *service) GracefulStop() {
-       _ = s.schemaRegistry.Close()
-       <-s.schemaRegistry.StopNotify()
-}
-
-// NewService returns a new metadata repository Service.
-func NewService(_ context.Context) (Service, error) {
-       return &service{}, nil
-}
-
-func (s *service) SchemaRegistry() schema.Registry {
-       return s.schemaRegistry
-}
-
-func (s *service) StreamRegistry() schema.Stream {
-       return s.schemaRegistry
-}
-
-func (s *service) IndexRuleRegistry() schema.IndexRule {
-       return s.schemaRegistry
-}
-
-func (s *service) IndexRuleBindingRegistry() schema.IndexRuleBinding {
-       return s.schemaRegistry
-}
-
-func (s *service) MeasureRegistry() schema.Measure {
-       return s.schemaRegistry
-}
-
-func (s *service) GroupRegistry() schema.Group {
-       return s.schemaRegistry
-}
-
-func (s *service) TopNAggregationRegistry() schema.TopNAggregation {
-       return s.schemaRegistry
-}
-
-func (s *service) PropertyRegistry() schema.Property {
-       return s.schemaRegistry
-}
-
-func (s *service) Name() string {
-       return "metadata"
-}
-
-func (s *service) IndexRules(ctx context.Context, subject *commonv1.Metadata) 
([]*databasev1.IndexRule, error) {
-       bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, 
schema.ListOpt{Group: subject.Group})
-       if err != nil {
-               return nil, err
-       }
-       now := time.Now()
-       foundRules := make([]string, 0)
-       for _, binding := range bindings {
-               if binding.GetBeginAt().AsTime().After(now) ||
-                       binding.GetExpireAt().AsTime().Before(now) {
-                       continue
-               }
-               sub := binding.GetSubject()
-               if sub.Name != subject.Name {
-                       continue
-               }
-               foundRules = append(foundRules, binding.Rules...)
-       }
-       result := make([]*databasev1.IndexRule, 0, len(foundRules))
-       var indexRuleErr error
-       for _, rule := range foundRules {
-               r, getErr := s.schemaRegistry.GetIndexRule(ctx, 
&commonv1.Metadata{
-                       Name:  rule,
-                       Group: subject.Group,
-               })
-               if getErr != nil {
-                       indexRuleErr = multierr.Append(indexRuleErr, err)
-                       continue
-               }
-               result = append(result, r)
-       }
-       return result, indexRuleErr
-}
-
-func (s *service) Subjects(ctx context.Context, indexRule 
*databasev1.IndexRule, catalog commonv1.Catalog) ([]schema.Spec, error) {
-       bindings, err := s.schemaRegistry.ListIndexRuleBinding(ctx, 
schema.ListOpt{Group: indexRule.GetMetadata().GetGroup()})
-       if err != nil {
-               return nil, err
-       }
-
-       now := time.Now()
-       var subjectErr error
-       foundSubjects := make([]schema.Spec, 0)
-       for _, binding := range bindings {
-               if binding.GetBeginAt().AsTime().After(now) ||
-                       binding.GetExpireAt().AsTime().Before(now) {
-                       continue
-               }
-               sub := binding.GetSubject()
-               if sub.GetCatalog() != catalog {
-                       continue
-               }
-
-               if !contains(binding.GetRules(), 
indexRule.GetMetadata().GetName()) {
-                       continue
-               }
-
-               switch catalog {
-               case commonv1.Catalog_CATALOG_STREAM:
-                       stream, getErr := s.schemaRegistry.GetStream(ctx, 
&commonv1.Metadata{
-                               Name:  sub.GetName(),
-                               Group: indexRule.GetMetadata().GetGroup(),
-                       })
-                       if getErr != nil {
-                               subjectErr = multierr.Append(subjectErr, getErr)
-                       }
-                       foundSubjects = append(foundSubjects, stream)
-               case commonv1.Catalog_CATALOG_MEASURE:
-                       measure, getErr := s.schemaRegistry.GetMeasure(ctx, 
&commonv1.Metadata{
-                               Name:  sub.GetName(),
-                               Group: indexRule.GetMetadata().GetGroup(),
-                       })
-                       if getErr != nil {
-                               subjectErr = multierr.Append(subjectErr, getErr)
-                       }
-                       foundSubjects = append(foundSubjects, measure)
-               default:
-                       continue
-               }
-       }
-
-       return foundSubjects, subjectErr
-}
-
-func contains(s []string, e string) bool {
-       for _, a := range s {
-               if a == e {
-                       return true
-               }
-       }
-       return false
-}
diff --git a/banyand/metadata/metadata_test.go 
b/banyand/metadata/metadata_test.go
index 156480df..56eac46c 100644
--- a/banyand/metadata/metadata_test.go
+++ b/banyand/metadata/metadata_test.go
@@ -47,6 +47,7 @@ func Test_service_RulesBySubject(t *testing.T) {
        is.NoError(err)
        err = s.FlagSet().Parse([]string{"--metadata-root-path=" + rootDir})
        is.NoError(err)
+       is.NoError(s.Validate())
        err = s.PreRun()
        is.NoError(err)
        defer func() {
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 59056bf8..2c147f30 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -19,15 +19,11 @@ package schema
 
 import (
        "context"
-       "net/url"
-       "os"
-       "path/filepath"
        "sync"
        "time"
 
        "github.com/pkg/errors"
        clientv3 "go.etcd.io/etcd/client/v3"
-       "go.etcd.io/etcd/server/v3/embed"
        "go.uber.org/zap"
        "google.golang.org/grpc"
        "google.golang.org/protobuf/proto"
@@ -35,7 +31,6 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
-       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -61,18 +56,10 @@ type HasMetadata interface {
 // RegistryOption is the option to create Registry.
 type RegistryOption func(*etcdSchemaRegistryConfig)
 
-// RootDir sets the root directory of Registry.
-func RootDir(rootDir string) RegistryOption {
+// ConfigureServerEndpoints sets a list of the server urls.
+func ConfigureServerEndpoints(url []string) RegistryOption {
        return func(config *etcdSchemaRegistryConfig) {
-               config.rootDir = rootDir
-       }
-}
-
-// ConfigureListener sets client and peer urls of listeners.
-func ConfigureListener(lc, lp string) RegistryOption {
-       return func(config *etcdSchemaRegistryConfig) {
-               config.listenerClientURL = lc
-               config.listenerPeerURL = lp
+               config.serverEndpoints = url
        }
 }
 
@@ -86,7 +73,6 @@ func (eh *eventHandler) interestOf(kind Kind) bool {
 }
 
 type etcdSchemaRegistry struct {
-       server   *embed.Etcd
        client   *clientv3.Client
        closer   *run.Closer
        handlers []*eventHandler
@@ -94,12 +80,7 @@ type etcdSchemaRegistry struct {
 }
 
 type etcdSchemaRegistryConfig struct {
-       // rootDir is the root directory for etcd storage
-       rootDir string
-       // listenerClientURL is the listener for client
-       listenerClientURL string
-       // listenerPeerURL is the listener for peer
-       listenerPeerURL string
+       serverEndpoints []string
 }
 
 func (e *etcdSchemaRegistry) RegisterHandler(kind Kind, handler EventHandler) {
@@ -133,35 +114,21 @@ func (e *etcdSchemaRegistry) notifyDelete(metadata 
Metadata) {
        }
 }
 
-func (e *etcdSchemaRegistry) ReadyNotify() <-chan struct{} {
-       return e.server.Server.ReadyNotify()
-}
-
-func (e *etcdSchemaRegistry) StopNotify() <-chan struct{} {
-       return e.server.Server.StopNotify()
-}
-
-func (e *etcdSchemaRegistry) StoppingNotify() <-chan struct{} {
-       return e.server.Server.StoppingNotify()
-}
-
 func (e *etcdSchemaRegistry) Close() error {
+       e.closer.Done()
        e.closer.CloseThenWait()
-       _ = e.client.Close()
-       e.server.Close()
-       return nil
+       return e.client.Close()
 }
 
 // NewEtcdSchemaRegistry returns a Registry powered by Etcd.
 func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) {
-       registryConfig := &etcdSchemaRegistryConfig{
-               rootDir:           os.TempDir(),
-               listenerClientURL: embed.DefaultListenClientURLs,
-               listenerPeerURL:   embed.DefaultListenPeerURLs,
-       }
+       registryConfig := &etcdSchemaRegistryConfig{}
        for _, opt := range options {
                opt(registryConfig)
        }
+       if registryConfig.serverEndpoints == nil {
+               return nil, errors.New("server address is not set")
+       }
        zapCfg := logger.GetLogger("etcd").ToZapConfig()
 
        var l *zap.Logger
@@ -169,18 +136,9 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) 
(Registry, error) {
        if l, err = zapCfg.Build(); err != nil {
                return nil, err
        }
-       // TODO: allow use cluster setting
-       embedConfig := newStandaloneEtcdConfig(registryConfig, l)
-       e, err := embed.StartEtcd(embedConfig)
-       if err != nil {
-               return nil, err
-       }
-       if e != nil {
-               <-e.Server.ReadyNotify() // wait for e.Server to join the 
cluster
-       }
 
        config := clientv3.Config{
-               Endpoints:            
[]string{e.Config().AdvertiseClientUrls[0].String()},
+               Endpoints:            registryConfig.serverEndpoints,
                DialTimeout:          5 * time.Second,
                DialKeepAliveTime:    30 * time.Second,
                DialKeepAliveTimeout: 10 * time.Second,
@@ -192,9 +150,8 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) 
(Registry, error) {
                return nil, err
        }
        reg := &etcdSchemaRegistry{
-               server: e,
                client: client,
-               closer: run.NewCloser(0),
+               closer: run.NewCloser(1),
        }
        return reg, nil
 }
@@ -397,22 +354,3 @@ func incrementLastByte(key string) string {
        bb[len(bb)-1]++
        return string(bb)
 }
-
-func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig, logger 
*zap.Logger) *embed.Config {
-       cfg := embed.NewConfig()
-       cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger)
-       cfg.Dir = filepath.Join(config.rootDir, "metadata")
-       observability.UpdatePath(cfg.Dir)
-       cURL, _ := url.Parse(config.listenerClientURL)
-       pURL, _ := url.Parse(config.listenerPeerURL)
-
-       cfg.ClusterState = "new"
-       cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*cURL}, 
[]url.URL{*cURL}
-       cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*pURL}, 
[]url.URL{*pURL}
-       cfg.InitialCluster = ",default=" + pURL.String()
-
-       cfg.BackendBatchInterval = 500 * time.Millisecond
-       cfg.BackendBatchLimit = 10000
-       cfg.MaxRequestBytes = 10 * 1024 * 1024
-       return cfg
-}
diff --git a/banyand/metadata/schema/etcd_test.go 
b/banyand/metadata/schema/etcd_test.go
index 4b8136a8..4d9a647c 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -34,6 +34,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
@@ -118,30 +119,34 @@ func randomTempDir() string {
        return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", 
uuid.New().String()))
 }
 
-func useRandomTempDir() RegistryOption {
-       return func(config *etcdSchemaRegistryConfig) {
-               config.rootDir = randomTempDir()
+func initServerAndRegister(t *testing.T) (Registry, func()) {
+       req := require.New(t)
+       ports, err := test.AllocateFreePorts(2)
+       if err != nil {
+               panic("fail to find free ports")
        }
-}
-
-func useRandomPort() RegistryOption {
-       return func(config *etcdSchemaRegistryConfig) {
-               ports, err := test.AllocateFreePorts(2)
-               if err != nil {
-                       panic("fail to find free ports")
-               }
-               config.listenerClientURL, config.listenerPeerURL = 
fmt.Sprintf("http://127.0.0.1:%d";, ports[0]), 
fmt.Sprintf("http://127.0.0.1:%d";, ports[1])
+       endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d";, ports[0])}
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener(endpoints, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(randomTempDir()))
+       req.NoError(err)
+       req.NotNil(server)
+       <-server.ReadyNotify()
+       schemaRegistry, err := 
NewEtcdSchemaRegistry(ConfigureServerEndpoints(endpoints))
+       req.NoError(err)
+       req.NotNil(server)
+       return schemaRegistry, func() {
+               server.Close()
+               <-server.StopNotify()
+               schemaRegistry.Close()
        }
 }
 
 func Test_Etcd_Entity_Get(t *testing.T) {
        tester := assert.New(t)
-       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir())
-       tester.NoError(err)
-       tester.NotNil(registry)
-       defer registry.Close()
-
-       err = preloadSchema(registry)
+       registry, closer := initServerAndRegister(t)
+       defer closer()
+       err := preloadSchema(registry)
        tester.NoError(err)
 
        tests := []struct {
@@ -228,12 +233,10 @@ func Test_Etcd_Entity_Get(t *testing.T) {
 
 func Test_Etcd_Entity_List(t *testing.T) {
        tester := assert.New(t)
-       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir())
-       tester.NoError(err)
-       tester.NotNil(registry)
-       defer registry.Close()
+       registry, closer := initServerAndRegister(t)
+       defer closer()
 
-       err = preloadSchema(registry)
+       err := preloadSchema(registry)
        tester.NoError(err)
 
        tests := []struct {
@@ -310,12 +313,10 @@ func Test_Etcd_Entity_List(t *testing.T) {
 
 func Test_Etcd_Delete(t *testing.T) {
        tester := assert.New(t)
-       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir())
-       tester.NoError(err)
-       tester.NotNil(registry)
-       defer registry.Close()
+       registry, closer := initServerAndRegister(t)
+       defer closer()
 
-       err = preloadSchema(registry)
+       err := preloadSchema(registry)
        tester.NoError(err)
 
        tests := []struct {
@@ -379,12 +380,10 @@ func Test_Etcd_Delete(t *testing.T) {
 
 func Test_Notify(t *testing.T) {
        req := require.New(t)
-       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir())
-       req.NoError(err)
-       req.NotNil(registry)
-       defer registry.Close()
+       registry, closer := initServerAndRegister(t)
+       defer closer()
 
-       err = preloadSchema(registry)
+       err := preloadSchema(registry)
        req.NoError(err)
 
        tests := []struct {
@@ -524,12 +523,10 @@ func Test_Notify(t *testing.T) {
 
 func Test_Etcd_Entity_Update(t *testing.T) {
        tester := assert.New(t)
-       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir())
-       tester.NoError(err)
-       tester.NotNil(registry)
-       defer registry.Close()
+       registry, closer := initServerAndRegister(t)
+       defer closer()
 
-       err = preloadSchema(registry)
+       err := preloadSchema(registry)
        tester.NoError(err)
 
        tests := []struct {
diff --git a/banyand/metadata/schema/schema.go 
b/banyand/metadata/schema/schema.go
index e26ad943..f9614938 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -61,9 +61,6 @@ type ListOpt struct {
 // Registry allowing depositing resources.
 type Registry interface {
        io.Closer
-       ReadyNotify() <-chan struct{}
-       StopNotify() <-chan struct{}
-       StoppingNotify() <-chan struct{}
        Stream
        IndexRule
        IndexRuleBinding
diff --git a/banyand/metadata/server.go b/banyand/metadata/server.go
new file mode 100644
index 00000000..ce004e3c
--- /dev/null
+++ b/banyand/metadata/server.go
@@ -0,0 +1,95 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metadata
+
+import (
+       "context"
+       "errors"
+       "strings"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+type server struct {
+       Service
+       metaServer      embeddedetcd.Server
+       rootDir         string
+       listenClientURL []string
+       listenPeerURL   []string
+}
+
+func (s *server) Name() string {
+       return "metadata"
+}
+
+func (s *server) FlagSet() *run.FlagSet {
+       fs := run.NewFlagSet("metadata")
+       fs.StringVar(&s.rootDir, "metadata-root-path", "/tmp", "the root path 
of metadata")
+       fs.StringArrayVar(&s.listenClientURL, "etcd-listen-client-url", 
[]string{"http://localhost:2379"}, "A URL to listen on for client traffic")
+       fs.StringArrayVar(&s.listenPeerURL, "etcd-listen-peer-url", 
[]string{"http://localhost:2380"}, "A URL to listen on for peer traffic")
+       return fs
+}
+
+func (s *server) Validate() error {
+       if s.rootDir == "" {
+               return errors.New("rootDir is empty")
+       }
+       if s.listenClientURL == nil {
+               return errors.New("listenClientURL is empty")
+       }
+       if s.listenPeerURL == nil {
+               return errors.New("listenPeerURL is empty")
+       }
+       if err := s.Service.FlagSet().Set(flagEtcdEndpointsName,
+               strings.Join(s.listenClientURL, ",")); err != nil {
+               return err
+       }
+       return s.Service.Validate()
+}
+
+func (s *server) PreRun() error {
+       var err error
+       s.metaServer, err = 
embeddedetcd.NewServer(embeddedetcd.RootDir(s.rootDir), 
embeddedetcd.ConfigureListener(s.listenClientURL, s.listenPeerURL))
+       if err != nil {
+               return err
+       }
+       <-s.metaServer.ReadyNotify()
+       return s.Service.PreRun()
+}
+
+func (s *server) Serve() run.StopNotify {
+       return s.metaServer.StoppingNotify()
+}
+
+func (s *server) GracefulStop() {
+       s.Service.GracefulStop()
+       s.metaServer.Close()
+       <-s.metaServer.StopNotify()
+}
+
+// NewService returns a new metadata repository Service.
+func NewService(ctx context.Context) (Service, error) {
+       s := &server{}
+       var err error
+       s.Service, err = NewClient(ctx)
+       if err != nil {
+               return nil, err
+       }
+       return s, nil
+}
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 12c7a106..2884f8d2 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -27,7 +27,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/data"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -54,8 +53,7 @@ var (
 type queryService struct {
        log *logger.Logger
        // TODO: remove the metaService once 
https://github.com/apache/skywalking/issues/10121 is fixed.
-       metaService metadata.Service
-       serviceRepo discovery.ServiceRepo
+       metaService metadata.Repo
        pipeline    queue.Queue
        sqp         *streamQueryProcessor
        mqp         *measureQueryProcessor
diff --git a/banyand/query/query.go b/banyand/query/query.go
index 3e4eab1e..fbff5030 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -21,7 +21,6 @@ package query
 import (
        "context"
 
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -31,11 +30,10 @@ import (
 
 // NewService return a new query service.
 func NewService(_ context.Context, streamService stream.Service, 
measureService measure.Service,
-       metaService metadata.Service, serviceRepo discovery.ServiceRepo, 
pipeline queue.Queue,
+       metaService metadata.Repo, pipeline queue.Queue,
 ) (run.Unit, error) {
        svc := &queryService{
                metaService: metaService,
-               serviceRepo: serviceRepo,
                pipeline:    pipeline,
        }
        // measure query processor
diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md
index 82b5859d..733f14e8 100644
--- a/docs/concept/clustering.md
+++ b/docs/concept/clustering.md
@@ -35,6 +35,14 @@ BanyanDB integrates multiple roles into a single process in 
the standalone mode,
 
 In this mode, the single process performs the roles of the Liaison Node, Query 
Node, Data Node, and Meta Node. It receives requests, maintains metadata, 
processes queries, and handles data, all within a unified setup.
 
+### 1.6 Mix Mode in Storage Nodes
+
+Query nodes and data nodes are implemented by a same executable binary, 
Storage Node. With the flag "mode", the storage node can be started as a query 
node or a data node. The default mode is "mix", which means the storage node is 
both a query node and a data node.
+
+If the workload of query is high, you can start more storage nodes with the 
flag "mode" set to "query". If the workload of write is high, you can start 
more storage nodes with the flag "mode" set to "data".
+
+Or you can start storage nodes with the flag "mode" set to "mix" to balance 
the workload of query and write.
+
 ## 2. Communication within a Cluster
 
 All nodes within a BanyanDB cluster communicate with other nodes according to 
their roles:
diff --git a/docs/installation.md b/docs/installation.md
index 6e3ef2d2..d29ab213 100644
--- a/docs/installation.md
+++ b/docs/installation.md
@@ -92,7 +92,10 @@ Usage:
 Available Commands:
   completion  generate the autocompletion script for the specified shell
   help        Help about any command
-  standalone  Run as the standalone mode
+  liaison     Run as the liaison server
+  meta        Run as the meta server
+  standalone  Run as the standalone server
+  storage     Run as the storage server
 
 Flags:
   -h, --help      help for this command
@@ -101,7 +104,7 @@ Flags:
 Use " [command] --help" for more information about a command.
 ```
 
-Banyand is running as a standalone process by
+Banyand is running as a standalone server by
 
 ```shell
 $ ./banyand-server standalone
@@ -119,46 +122,44 @@ $ ./banyand-server standalone
 
 The banyand-server would be listening on the `0.0.0.0:17912` if no errors 
occurred.
 
-To discover more options to configure the banyand by
+## Setup Multiple Banyand as Cluster(TBD)
+
+### Setup Standalone Nodes
+
+The standalone node is running as a standalone process by
 
 ```shell
-$ ./banyand-server standalone -h
-Usage:
-   standalone [flags]
+$ ./banyand-server standalone <flags> -id n1
+$ ./banyand-server standalone <flags> -id n2
+$ ./banyand-server standalone <flags> -id n3
+```
 
-Flags:
-      --addr string                          the address of banyand listens 
(default ":17912")
-      --cert-file string                     the TLS cert file
-      --etcd-listen-client-url string        A URL to listen on for client 
traffic (default "http://localhost:2379";)
-      --etcd-listen-peer-url string          A URL to listen on for peer 
traffic (default "http://localhost:2380";)
-  -h, --help                                 help for standalone
-      --http-addr string                     listen addr for http (default 
":17913")
-      --http-cert-file string                the TLS cert file of http server
-      --http-grpc-addr string                http server redirect grpc 
requests to this address (default "localhost:17912")
-      --http-grpc-cert-file string           the grpc TLS cert file if grpc 
server enables tls
-      --http-key-file string                 the TLS key file of http server
-      --http-tls                             connection uses TLS if true, else 
plain HTTP
-      --key-file string                      the TLS key file
-      --logging-env string                   the logging (default "prod")
-      --logging-level string                 the root level of logging 
(default "info")
-      --logging-levels stringArray           the level logging of logging
-      --logging-modules stringArray          the specific module
-      --max-recv-msg-size bytes              the size of max receiving message 
(default 10.00MiB)
-      --measure-buffer-size bytes            block buffer size (default 
4.00MiB)
-      --measure-encoder-buffer-size bytes    block fields buffer size (default 
12.00MiB)
-      --measure-idx-batch-wait-sec int       index batch wait in second 
(default 1)
-      --measure-root-path string             the root path of database 
(default "/tmp")
-      --measure-seriesmeta-mem-size bytes    series metadata memory size 
(default 1.00MiB)
-      --metadata-root-path string            the root path of metadata 
(default "/tmp")
-  -n, --name string                          name of this service (default 
"standalone")
-      --observability-listener-addr string   listen addr for observability 
(default ":2121")
-      --pprof-listener-addr string           listen addr for pprof (default 
":6060")
-      --show-rungroup-units                  show rungroup units
-      --stream-block-buffer-size bytes       block buffer size (default 
8.00MiB)
-      --stream-global-index-mem-size bytes   global index memory size (default 
2.00MiB)
-      --stream-idx-batch-wait-sec int        index batch wait in second 
(default 1)
-      --stream-root-path string              the root path of database 
(default "/tmp")
-      --stream-seriesmeta-mem-size bytes     series metadata memory size 
(default 1.00MiB)
-      --tls                                  connection uses TLS if true, else 
plain TCP
-  -v, --version                              version for standalone
+The standalone node would be listening on the `<ports>` if no errors occurred.
+
+### Setup Role-Based Nodes
+
+The meta nodes should boot up firstly to provide the metadata service for the 
whole cluster. The meta node is running as a standalone process by
+
+```shell
+$ ./banyand-server meta <flags>
+```
+
+The meta node would be listening on the `<ports>` if no errors occurred.
+
+
+Data nodes, query nodes and liaison nodes are running as independent processes 
by
+
+```shell
+$ ./banyand-server storage --mode data <flags>
+$ ./banyand-server storage --mode query <flags>
+$ ./banyand-server liaison <flags>
+```
+
+The data node, query node and liaison node would be listening on the `<ports>` 
if no errors occurred.
+
+If you want to use a `mix` mode instead of separate query and data nodes, you 
can run the banyand-server as processes by
+
+```shell
+$ ./banyand-server storage <flags>
+$ ./banyand-server liaison <flags>
 ```
diff --git a/pkg/run/run.go b/pkg/run/run.go
index c45e59a0..4630ff07 100644
--- a/pkg/run/run.go
+++ b/pkg/run/run.go
@@ -176,6 +176,42 @@ func (g *Group) Register(units ...Unit) []bool {
        return hasRegistered
 }
 
+// Deregister will remove the provided objects implementing the Unit interface
+// from the Group.
+// The returned array of booleans is of the same size as the amount of provided
+// Units, signaling for each provided Unit if it successfully deregistered from
+// Group or if it was ignored.
+func (g *Group) Deregister(units ...Unit) []bool {
+       hasDeregistered := make([]bool, len(units))
+       for idx := range units {
+               if c, ok := units[idx].(Config); ok {
+                       for i := range g.c {
+                               if g.c[i] == c {
+                                       g.c[i] = nil
+                                       hasDeregistered[idx] = true
+                               }
+                       }
+               }
+               if p, ok := units[idx].(PreRunner); ok {
+                       for i := range g.p {
+                               if g.p[i] == p {
+                                       g.p[i] = nil
+                                       hasDeregistered[idx] = true
+                               }
+                       }
+               }
+               if s, ok := units[idx].(Service); ok {
+                       for i := range g.s {
+                               if g.s[i] == s {
+                                       g.s[i] = nil
+                                       hasDeregistered[idx] = true
+                               }
+                       }
+               }
+       }
+       return hasDeregistered
+}
+
 // RegisterFlags returns FlagSet contains Flags in all modules.
 func (g *Group) RegisterFlags() *FlagSet {
        // run configuration stage
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 1c6dd77a..7b654471 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -93,7 +93,7 @@ func modules(schemaLoaders []SchemaLoader, flags []string) 
func() {
        measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, 
pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        // Init `Query` module
-       q, err := query.NewService(context.TODO(), streamSvc, measureSvc, 
metaSvc, repo, pipeline)
+       q, err := query.NewService(context.TODO(), streamSvc, measureSvc, 
metaSvc, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
        httpServer := http.NewService()

Reply via email to