This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a955dad0 Drop discovery module (#311)
a955dad0 is described below

commit a955dad0ff8f5ded48c1479f29c833b714136581
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Aug 2 14:26:30 2023 +0800

    Drop discovery module (#311)
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                    |   1 +
 README.md                                     |   1 -
 api/event/doc.go                              |  19 --
 api/event/measure.go                          |  43 -----
 api/event/stream.go                           |  43 -----
 api/proto/banyandb/database/v1/database.proto |   7 +-
 banyand/discovery/discovery.go                |  78 --------
 banyand/internal/cmd/liaison.go               |  10 +-
 banyand/internal/cmd/standalone.go            |  14 +-
 banyand/internal/cmd/storage.go               |  12 +-
 banyand/liaison/grpc/discovery.go             | 252 ++++++++++++++++++++------
 banyand/liaison/grpc/registry_test.go         |   9 +-
 banyand/liaison/grpc/server.go                |  39 +---
 banyand/liaison/liaison.go                    |   5 +-
 banyand/measure/measure.go                    |   7 -
 banyand/measure/measure_suite_test.go         |  20 +-
 banyand/measure/metadata.go                   |   7 +-
 banyand/measure/metadata_test.go              |   7 -
 banyand/measure/service.go                    |  27 +--
 banyand/metadata/allocator.go                 |  90 +++++++++
 banyand/metadata/client.go                    |  17 +-
 banyand/metadata/metadata.go                  |   2 +
 banyand/metadata/schema/checker.go            |  14 ++
 banyand/metadata/schema/node.go               |  53 ++++++
 banyand/metadata/schema/schema.go             |  60 +++++-
 banyand/metadata/schema/shard.go              |  76 ++++++++
 banyand/queue/local.go                        |   2 -
 banyand/queue/queue.go                        |   6 +-
 banyand/stream/metadata.go                    |   7 +-
 banyand/stream/metadata_test.go               |   7 -
 banyand/stream/service.go                     |  24 +--
 banyand/stream/stream.go                      |   7 -
 banyand/stream/stream_suite_test.go           |  21 +--
 dist/LICENSE                                  |   6 +-
 docs/api-reference.md                         |   1 +
 go.mod                                        |   6 +-
 go.sum                                        |  12 +-
 pkg/schema/metadata.go                        | 133 ++------------
 pkg/test/setup/setup.go                       |  19 +-
 39 files changed, 592 insertions(+), 572 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 338d9c6b..bbdec150 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,7 @@ Release Notes.
 ### Chores
 
 - Bump several dependencies and tools.
+- Drop redundant "discovery" module from banyand. "metadata" module is enough 
to play the node and shard discovery role.
 
 ## 0.4.0
 
diff --git a/README.md b/README.md
index 83a32c53..73ae9c7c 100644
--- a/README.md
+++ b/README.md
@@ -29,7 +29,6 @@ The database research community usually uses [RUM 
conjecture](http://daslab.seas
 - [ ] Sharding
 - [ ] Load balance
 - [ ] Distributed query optimizer
-- [ ] Node discovery
 - [ ] Data queue
 
 ### Data processor
diff --git a/api/event/doc.go b/api/event/doc.go
deleted file mode 100644
index 8a9656c7..00000000
--- a/api/event/doc.go
+++ /dev/null
@@ -1,19 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package event contains metadata syncing topics.
-package event
diff --git a/api/event/measure.go b/api/event/measure.go
deleted file mode 100644
index 68516d6c..00000000
--- a/api/event/measure.go
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package event
-
-import (
-       "github.com/apache/skywalking-banyandb/api/common"
-       "github.com/apache/skywalking-banyandb/pkg/bus"
-)
-
-var (
-       // MeasureShardEventKindVersion is the version tag of measure shard 
event kind.
-       MeasureShardEventKindVersion = common.KindVersion{
-               Version: "v1",
-               Kind:    "measure-event-shard",
-       }
-
-       // MeasureTopicShardEvent is the measure shard event publishing topic.
-       MeasureTopicShardEvent = 
bus.UniTopic(MeasureShardEventKindVersion.String())
-
-       // MeasureEntityEventKindVersion is the version tag of measure entity 
kind.
-       MeasureEntityEventKindVersion = common.KindVersion{
-               Version: "v1",
-               Kind:    "measure-event-entity",
-       }
-
-       // MeasureTopicEntityEvent is the measure entity event publishing topic.
-       MeasureTopicEntityEvent = 
bus.UniTopic(MeasureEntityEventKindVersion.String())
-)
diff --git a/api/event/stream.go b/api/event/stream.go
deleted file mode 100644
index e39f01fb..00000000
--- a/api/event/stream.go
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package event
-
-import (
-       "github.com/apache/skywalking-banyandb/api/common"
-       "github.com/apache/skywalking-banyandb/pkg/bus"
-)
-
-var (
-       // StreamShardEventKindVersion is the version tag of stream shard 
entity kind.
-       StreamShardEventKindVersion = common.KindVersion{
-               Version: "v1",
-               Kind:    "stream-event-shard",
-       }
-
-       // StreamTopicShardEvent is the stream entity event publishing topic.
-       StreamTopicShardEvent = 
bus.UniTopic(StreamShardEventKindVersion.String())
-
-       // StreamEntityEventKindVersion is the version tag of stream entity 
kind.
-       StreamEntityEventKindVersion = common.KindVersion{
-               Version: "v1",
-               Kind:    "stream-event-entity",
-       }
-
-       // StreamTopicEntityEvent is the stream entity event publishing topic.
-       StreamTopicEntityEvent = 
bus.UniTopic(StreamEntityEventKindVersion.String())
-)
diff --git a/api/proto/banyandb/database/v1/database.proto 
b/api/proto/banyandb/database/v1/database.proto
index 2e2ac78b..c530b60b 100644
--- a/api/proto/banyandb/database/v1/database.proto
+++ b/api/proto/banyandb/database/v1/database.proto
@@ -27,9 +27,10 @@ option java_package = 
"org.apache.skywalking.banyandb.database.v1";
 
 message Node {
   string id = 1;
-  string addr = 2;
-  google.protobuf.Timestamp updated_at = 3;
-  google.protobuf.Timestamp created_at = 4;
+  common.v1.Metadata metadata = 2;
+  string addr = 3;
+  google.protobuf.Timestamp updated_at = 4;
+  google.protobuf.Timestamp created_at = 5;
 }
 
 message Shard {
diff --git a/banyand/discovery/discovery.go b/banyand/discovery/discovery.go
deleted file mode 100644
index 837ff4ec..00000000
--- a/banyand/discovery/discovery.go
+++ /dev/null
@@ -1,78 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Package discovery implements the service discovery.
-package discovery
-
-import (
-       "context"
-
-       "github.com/apache/skywalking-banyandb/pkg/bus"
-       "github.com/apache/skywalking-banyandb/pkg/run"
-)
-
-// ServiceRepo provides service subscripting and publishing.
-//
-//go:generate mockgen -destination=./discovery_mock.go -package=discovery 
github.com/apache/skywalking-banyandb/banyand/discovery ServiceRepo
-type ServiceRepo interface {
-       NodeID() string
-       Name() string
-       run.Unit
-       bus.Subscriber
-       bus.Publisher
-       run.Service
-}
-
-type repo struct {
-       local  *bus.Bus
-       stopCh chan struct{}
-}
-
-func (r *repo) NodeID() string {
-       return "local"
-}
-
-func (r *repo) Name() string {
-       return "service-discovery"
-}
-
-func (r *repo) Subscribe(topic bus.Topic, listener bus.MessageListener) error {
-       return r.local.Subscribe(topic, listener)
-}
-
-func (r *repo) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, 
error) {
-       return r.local.Publish(topic, message...)
-}
-
-// NewServiceRepo return a new ServiceRepo.
-func NewServiceRepo(_ context.Context) (ServiceRepo, error) {
-       return &repo{
-               local:  bus.NewBus(),
-               stopCh: make(chan struct{}),
-       }, nil
-}
-
-func (r *repo) Serve() run.StopNotify {
-       return r.stopCh
-}
-
-func (r *repo) GracefulStop() {
-       r.local.Close()
-       if r.stopCh != nil {
-               close(r.stopCh)
-       }
-}
diff --git a/banyand/internal/cmd/liaison.go b/banyand/internal/cmd/liaison.go
index 6c883468..97dff1e7 100644
--- a/banyand/internal/cmd/liaison.go
+++ b/banyand/internal/cmd/liaison.go
@@ -24,7 +24,6 @@ import (
 
        "github.com/spf13/cobra"
 
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/liaison"
        "github.com/apache/skywalking-banyandb/banyand/liaison/http"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -42,12 +41,8 @@ var liaisonGroup = run.NewGroup("liaison")
 func newLiaisonCmd() *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
-       repo, err := discovery.NewServiceRepo(ctx)
-       if err != nil {
-               l.Fatal().Err(err).Msg("failed to initiate service repository")
-       }
        // nolint: staticcheck
-       pipeline, err := queue.NewQueue(ctx, repo)
+       pipeline, err := queue.NewQueue(ctx)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate data pipeline")
        }
@@ -55,7 +50,7 @@ func newLiaisonCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
-       tcp, err := liaison.NewEndpoint(ctx, pipeline, repo, metaSvc)
+       tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate Endpoint transport 
layer")
        }
@@ -65,7 +60,6 @@ func newLiaisonCmd() *cobra.Command {
 
        units := []run.Unit{
                new(signal.Handler),
-               repo,
                pipeline,
                tcp,
                httpServer,
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/standalone.go
index 027e0407..9d6eccaa 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -24,7 +24,6 @@ import (
 
        "github.com/spf13/cobra"
 
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/liaison"
        "github.com/apache/skywalking-banyandb/banyand/liaison/http"
        "github.com/apache/skywalking-banyandb/banyand/measure"
@@ -45,11 +44,7 @@ var standaloneGroup = run.NewGroup("standalone")
 func newStandaloneCmd() *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
-       repo, err := discovery.NewServiceRepo(ctx)
-       if err != nil {
-               l.Fatal().Err(err).Msg("failed to initiate service repository")
-       }
-       pipeline, err := queue.NewQueue(ctx, repo)
+       pipeline, err := queue.NewQueue(ctx)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate data pipeline")
        }
@@ -57,11 +52,11 @@ func newStandaloneCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
-       streamSvc, err := stream.NewService(ctx, metaSvc, repo, pipeline)
+       streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate stream service")
        }
-       measureSvc, err := measure.NewService(ctx, metaSvc, repo, pipeline)
+       measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
@@ -69,7 +64,7 @@ func newStandaloneCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
-       tcp, err := liaison.NewEndpoint(ctx, pipeline, repo, metaSvc)
+       tcp, err := liaison.NewEndpoint(ctx, pipeline, metaSvc)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate Endpoint transport 
layer")
        }
@@ -79,7 +74,6 @@ func newStandaloneCmd() *cobra.Command {
 
        units := []run.Unit{
                new(signal.Handler),
-               repo,
                pipeline,
                metaSvc,
                measureSvc,
diff --git a/banyand/internal/cmd/storage.go b/banyand/internal/cmd/storage.go
index 80c3311b..7d297fc0 100644
--- a/banyand/internal/cmd/storage.go
+++ b/banyand/internal/cmd/storage.go
@@ -24,7 +24,6 @@ import (
 
        "github.com/spf13/cobra"
 
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -51,12 +50,8 @@ var flagStorageMode string
 func newStorageCmd() *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
-       repo, err := discovery.NewServiceRepo(ctx)
-       if err != nil {
-               l.Fatal().Err(err).Msg("failed to initiate service repository")
-       }
        // nolint: staticcheck
-       pipeline, err := queue.NewQueue(ctx, repo)
+       pipeline, err := queue.NewQueue(ctx)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate data pipeline")
        }
@@ -64,11 +59,11 @@ func newStorageCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
-       streamSvc, err := stream.NewService(ctx, metaSvc, repo, pipeline)
+       streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate stream service")
        }
-       measureSvc, err := measure.NewService(ctx, metaSvc, repo, pipeline)
+       measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
@@ -82,7 +77,6 @@ func newStorageCmd() *cobra.Command {
 
        units := []run.Unit{
                new(signal.Handler),
-               repo,
                pipeline,
                measureSvc,
                streamSvc,
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index bb9f66e1..a2a38841 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -18,7 +18,10 @@
 package grpc
 
 import (
+       "context"
+       "fmt"
        "sync"
+       "time"
 
        "github.com/pkg/errors"
 
@@ -26,9 +29,10 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
-       "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
 )
@@ -36,20 +40,101 @@ import (
 var errNotExist = errors.New("the object doesn't exist")
 
 type discoveryService struct {
-       shardRepo  *shardRepo
-       entityRepo *entityRepo
-       pipeline   queue.Queue
-       log        *logger.Logger
+       pipeline     queue.Queue
+       metadataRepo metadata.Repo
+       shardRepo    *shardRepo
+       entityRepo   *entityRepo
+       log          *logger.Logger
+       kind         schema.Kind
 }
 
-func newDiscoveryService(pipeline queue.Queue) *discoveryService {
+func newDiscoveryService(pipeline queue.Queue, kind schema.Kind, metadataRepo 
metadata.Repo) *discoveryService {
+       sr := &shardRepo{shardEventsMap: make(map[identity]uint32)}
+       er := &entityRepo{entitiesMap: 
make(map[identity]partition.EntityLocator)}
        return &discoveryService{
-               shardRepo:  &shardRepo{shardEventsMap: 
make(map[identity]uint32)},
-               entityRepo: &entityRepo{entitiesMap: 
make(map[identity]partition.EntityLocator)},
-               pipeline:   pipeline,
+               shardRepo:    sr,
+               entityRepo:   er,
+               pipeline:     pipeline,
+               kind:         kind,
+               metadataRepo: metadataRepo,
        }
 }
 
+func (ds *discoveryService) initialize() error {
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctx)
+       cancel()
+       if err != nil {
+               return err
+       }
+       for _, g := range groups {
+               switch ds.kind {
+               case schema.KindMeasure:
+               case schema.KindStream:
+               default:
+                       continue
+               }
+               ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
+               shards, innerErr := 
ds.metadataRepo.ShardRegistry().ListShard(ctx, schema.ListOpt{Group: 
g.Metadata.Name})
+               cancel()
+               if innerErr != nil {
+                       return innerErr
+               }
+               for _, s := range shards {
+                       ds.shardRepo.OnAddOrUpdate(schema.Metadata{
+                               TypeMeta: schema.TypeMeta{
+                                       Kind:  schema.KindShard,
+                                       Name:  s.Metadata.Name,
+                                       Group: s.Metadata.Group,
+                               },
+                               Spec: s,
+                       })
+               }
+
+               switch ds.kind {
+               case schema.KindMeasure:
+                       ctx, cancel = context.WithTimeout(context.Background(), 
5*time.Second)
+                       mm, innerErr := 
ds.metadataRepo.MeasureRegistry().ListMeasure(ctx, schema.ListOpt{Group: 
g.Metadata.Name})
+                       cancel()
+                       if innerErr != nil {
+                               return innerErr
+                       }
+                       for _, m := range mm {
+                               ds.entityRepo.OnAddOrUpdate(schema.Metadata{
+                                       TypeMeta: schema.TypeMeta{
+                                               Kind:  schema.KindMeasure,
+                                               Name:  m.Metadata.Name,
+                                               Group: m.Metadata.Group,
+                                       },
+                                       Spec: m,
+                               })
+                       }
+               case schema.KindStream:
+                       ctx, cancel = context.WithTimeout(context.Background(), 
5*time.Second)
+                       ss, innerErr := 
ds.metadataRepo.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: 
g.Metadata.Name})
+                       cancel()
+                       if innerErr != nil {
+                               return innerErr
+                       }
+                       for _, s := range ss {
+                               ds.entityRepo.OnAddOrUpdate(schema.Metadata{
+                                       TypeMeta: schema.TypeMeta{
+                                               Kind:  schema.KindStream,
+                                               Name:  s.Metadata.Name,
+                                               Group: s.Metadata.Group,
+                                       },
+                                       Spec: s,
+                               })
+                       }
+               default:
+                       return fmt.Errorf("unsupported kind: %d", ds.kind)
+               }
+       }
+       ds.metadataRepo.RegisterHandler(schema.KindShard, ds.shardRepo)
+       ds.metadataRepo.RegisterHandler(ds.kind, ds.entityRepo)
+       return nil
+}
+
 func (ds *discoveryService) SetLogger(log *logger.Logger) {
        ds.log = log
        ds.shardRepo.log = log
@@ -75,38 +160,46 @@ type identity struct {
        group string
 }
 
+func (i identity) String() string {
+       return fmt.Sprintf("%s/%s", i.group, i.name)
+}
+
+var _ schema.EventHandler = (*shardRepo)(nil)
+
 type shardRepo struct {
        log            *logger.Logger
        shardEventsMap map[identity]uint32
        sync.RWMutex
 }
 
-func (s *shardRepo) Rev(message bus.Message) (resp bus.Message) {
-       e, ok := message.Data().(*databasev1.ShardEvent)
-       if !ok {
-               s.log.Warn().Msg("invalid e data type")
+// OnAddOrUpdate implements schema.EventHandler.
+func (s *shardRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
+       if schemaMetadata.Kind != schema.KindShard {
                return
        }
-       s.setShardNum(e)
-
+       shard := schemaMetadata.Spec.(*databasev1.Shard)
+       idx := getID(shard.GetMetadata())
        if le := s.log.Debug(); le.Enabled() {
-               le.
-                       Str("action", databasev1.Action_name[int32(e.Action)]).
-                       Uint64("shardID", e.Shard.Id).
-                       Msg("received a shard e")
+               le.Stringer("id", idx).Uint32("total", shard.Total).Msg("shard 
added or updated")
        }
-       return
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       s.shardEventsMap[idx] = shard.Total
 }
 
-func (s *shardRepo) setShardNum(eventVal *databasev1.ShardEvent) {
+// OnDelete implements schema.EventHandler.
+func (s *shardRepo) OnDelete(schemaMetadata schema.Metadata) {
+       if schemaMetadata.Kind != schema.KindShard {
+               return
+       }
+       shard := schemaMetadata.Spec.(*databasev1.Shard)
+       idx := getID(shard.GetMetadata())
+       if le := s.log.Debug(); le.Enabled() {
+               le.Stringer("id", idx).Msg("shard deleted")
+       }
        s.RWMutex.Lock()
        defer s.RWMutex.Unlock()
-       idx := getID(eventVal.GetShard().GetMetadata())
-       if eventVal.Action == databasev1.Action_ACTION_PUT {
-               s.shardEventsMap[idx] = eventVal.Shard.Total
-       } else if eventVal.Action == databasev1.Action_ACTION_DELETE {
-               delete(s.shardEventsMap, idx)
-       }
+       delete(s.shardEventsMap, idx)
 }
 
 func (s *shardRepo) shardNum(idx identity) (uint32, bool) {
@@ -126,49 +219,96 @@ func getID(metadata *commonv1.Metadata) identity {
        }
 }
 
+var _ schema.EventHandler = (*entityRepo)(nil)
+
 type entityRepo struct {
        log         *logger.Logger
        entitiesMap map[identity]partition.EntityLocator
        sync.RWMutex
 }
 
-func (s *entityRepo) Rev(message bus.Message) (resp bus.Message) {
-       e, ok := message.Data().(*databasev1.EntityEvent)
-       if !ok {
-               s.log.Warn().Msg("invalid e data type")
+// OnAddOrUpdate implements schema.EventHandler.
+func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
+       var el partition.EntityLocator
+       var id identity
+       switch schemaMetadata.Kind {
+       case schema.KindMeasure:
+               measure := schemaMetadata.Spec.(*databasev1.Measure)
+               el = partition.NewEntityLocator(measure.TagFamilies, 
measure.Entity)
+               id = getID(measure.GetMetadata())
+       case schema.KindStream:
+               stream := schemaMetadata.Spec.(*databasev1.Stream)
+               el = partition.NewEntityLocator(stream.TagFamilies, 
stream.Entity)
+               id = getID(stream.GetMetadata())
+       default:
                return
        }
-       id := getID(e.GetSubject())
-       if le := s.log.Debug(); le.Enabled() {
+       if le := e.log.Debug(); le.Enabled() {
+               var kind string
+               switch schemaMetadata.Kind {
+               case schema.KindMeasure:
+                       kind = "measure"
+               case schema.KindStream:
+                       kind = "stream"
+               default:
+                       kind = "unknown"
+               }
                le.
-                       Str("action", databasev1.Action_name[int32(e.Action)]).
-                       Interface("subject", id).
-                       Msg("received an entity event")
+                       Str("action", "add_or_update").
+                       Stringer("subject", id).
+                       Str("kind", kind).
+                       Msg("entity added or updated")
        }
-       s.RWMutex.Lock()
-       defer s.RWMutex.Unlock()
-       switch e.Action {
-       case databasev1.Action_ACTION_PUT:
-               en := make(partition.EntityLocator, 0, 
len(e.GetEntityLocator()))
-               for _, l := range e.GetEntityLocator() {
-                       en = append(en, partition.TagLocator{
-                               FamilyOffset: int(l.FamilyOffset),
-                               TagOffset:    int(l.TagOffset),
-                       })
+       en := make(partition.EntityLocator, 0, len(el))
+       for _, l := range el {
+               en = append(en, partition.TagLocator{
+                       FamilyOffset: l.FamilyOffset,
+                       TagOffset:    l.TagOffset,
+               })
+       }
+       e.RWMutex.Lock()
+       defer e.RWMutex.Unlock()
+       e.entitiesMap[id] = en
+}
+
+// OnDelete implements schema.EventHandler.
+func (e *entityRepo) OnDelete(schemaMetadata schema.Metadata) {
+       var id identity
+       switch schemaMetadata.Kind {
+       case schema.KindMeasure:
+               measure := schemaMetadata.Spec.(*databasev1.Measure)
+               id = getID(measure.GetMetadata())
+       case schema.KindStream:
+               stream := schemaMetadata.Spec.(*databasev1.Stream)
+               id = getID(stream.GetMetadata())
+       default:
+               return
+       }
+       if le := e.log.Debug(); le.Enabled() {
+               var kind string
+               switch schemaMetadata.Kind {
+               case schema.KindMeasure:
+                       kind = "measure"
+               case schema.KindStream:
+                       kind = "stream"
+               default:
+                       kind = "unknown"
                }
-               s.entitiesMap[id] = en
-       case databasev1.Action_ACTION_DELETE:
-               delete(s.entitiesMap, id)
-       case databasev1.Action_ACTION_UNSPECIFIED:
-               s.log.Warn().RawJSON("event", logger.Proto(e)).Msg("ignored 
unspecified event")
+               le.
+                       Str("action", "delete").
+                       Stringer("subject", id).
+                       Str("kind", kind).
+                       Msg("entity deleted")
        }
-       return
+       e.RWMutex.Lock()
+       defer e.RWMutex.Unlock()
+       delete(e.entitiesMap, id)
 }
 
-func (s *entityRepo) getLocator(id identity) (partition.EntityLocator, bool) {
-       s.RWMutex.RLock()
-       defer s.RWMutex.RUnlock()
-       el, ok := s.entitiesMap[id]
+func (e *entityRepo) getLocator(id identity) (partition.EntityLocator, bool) {
+       e.RWMutex.RLock()
+       defer e.RWMutex.RUnlock()
+       el, ok := e.entitiesMap[id]
        if !ok {
                return nil, false
        }
diff --git a/banyand/liaison/grpc/registry_test.go 
b/banyand/liaison/grpc/registry_test.go
index cc48ce86..4b12a1ac 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -31,7 +31,6 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -173,17 +172,14 @@ var _ = Describe("Registry", func() {
 })
 
 func setupForRegistry() func() {
-       // Init `Discovery` module
-       repo, err := discovery.NewServiceRepo(context.Background())
-       Expect(err).NotTo(HaveOccurred())
        // Init `Queue` module
-       pipeline, err := queue.NewQueue(context.TODO(), repo)
+       pipeline, err := queue.NewQueue(context.TODO())
        Expect(err).NotTo(HaveOccurred())
        // Init `Metadata` module
        metaSvc, err := metadata.NewService(context.TODO())
        Expect(err).NotTo(HaveOccurred())
 
-       tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+       tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc)
        preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
@@ -194,7 +190,6 @@ func setupForRegistry() func() {
                "--etcd-listen-peer-url="+listenPeerURL)
        deferFunc := test.SetupModules(
                flags,
-               repo,
                pipeline,
                metaSvc,
                preloadStreamSvc,
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 9ead16c3..2bb90664 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -34,16 +34,14 @@ import (
        "google.golang.org/grpc/health/grpc_health_v1"
        "google.golang.org/grpc/status"
 
-       "github.com/apache/skywalking-banyandb/api/event"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
-       "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -61,7 +59,6 @@ var (
 type server struct {
        pipeline queue.Queue
        creds    credentials.TransportCredentials
-       repo     discovery.ServiceRepo
        *indexRuleRegistryServer
        measureSVC *measureService
        log        *logger.Logger
@@ -85,16 +82,15 @@ type server struct {
 }
 
 // NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Repo) run.Unit {
+func NewServer(_ context.Context, pipeline queue.Queue, schemaRegistry 
metadata.Repo) run.Unit {
        streamSVC := &streamService{
-               discoveryService: newDiscoveryService(pipeline),
+               discoveryService: newDiscoveryService(pipeline, 
schema.KindStream, schemaRegistry),
        }
        measureSVC := &measureService{
-               discoveryService: newDiscoveryService(pipeline),
+               discoveryService: newDiscoveryService(pipeline, 
schema.KindMeasure, schemaRegistry),
        }
        s := &server{
                pipeline:   pipeline,
-               repo:       repo,
                streamSVC:  streamSVC,
                measureSVC: measureSVC,
                streamRegistryServer: &streamRegistryServer{
@@ -127,30 +123,13 @@ func (s *server) PreRun() error {
        s.log = logger.GetLogger("liaison-grpc")
        s.streamSVC.setLogger(s.log)
        s.measureSVC.setLogger(s.log)
-       components := []struct {
-               discoverySVC *discoveryService
-               shardEvent   bus.Topic
-               entityEvent  bus.Topic
-       }{
-               {
-                       shardEvent:   event.StreamTopicShardEvent,
-                       entityEvent:  event.StreamTopicEntityEvent,
-                       discoverySVC: s.streamSVC.discoveryService,
-               },
-               {
-                       shardEvent:   event.MeasureTopicShardEvent,
-                       entityEvent:  event.MeasureTopicEntityEvent,
-                       discoverySVC: s.measureSVC.discoveryService,
-               },
+       components := []*discoveryService{
+               s.streamSVC.discoveryService,
+               s.measureSVC.discoveryService,
        }
        for _, c := range components {
-               c.discoverySVC.SetLogger(s.log)
-               err := s.repo.Subscribe(c.shardEvent, c.discoverySVC.shardRepo)
-               if err != nil {
-                       return err
-               }
-               err = s.repo.Subscribe(c.entityEvent, c.discoverySVC.entityRepo)
-               if err != nil {
+               c.SetLogger(s.log)
+               if err := c.initialize(); err != nil {
                        return err
                }
        }
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index 539d2f45..a1b657cf 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -21,7 +21,6 @@ package liaison
 import (
        "context"
 
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -29,6 +28,6 @@ import (
 )
 
 // NewEndpoint return a new endpoint which is the entry point for the database 
server.
-func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Repo) (run.Unit, error) {
-       return grpc.NewServer(ctx, pipeline, repo, schemaRegistry), nil
+func NewEndpoint(ctx context.Context, pipeline queue.Queue, schemaRegistry 
metadata.Repo) (run.Unit, error) {
+       return grpc.NewServer(ctx, pipeline, schemaRegistry), nil
 }
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index a5ee5857..9fbfe653 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -31,7 +31,6 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/banyand/tsdb/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/partition"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -51,7 +50,6 @@ type measure struct {
        processorManager       *topNProcessorManager
        name                   string
        group                  string
-       entityLocator          partition.EntityLocator
        indexRules             []*databasev1.IndexRule
        topNAggregations       []*databasev1.TopNAggregation
        maxObservedModRevision int64
@@ -98,10 +96,6 @@ func (s *measure) MaxObservedModRevision() int64 {
        return s.maxObservedModRevision
 }
 
-func (s *measure) EntityLocator() partition.EntityLocator {
-       return s.entityLocator
-}
-
 func (s *measure) Close() error {
        if s.processorManager == nil {
                return nil
@@ -111,7 +105,6 @@ func (s *measure) Close() error {
 
 func (s *measure) parseSpec() (err error) {
        s.name, s.group = s.schema.GetMetadata().GetName(), 
s.schema.GetMetadata().GetGroup()
-       s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), 
s.schema.GetEntity())
        s.maxObservedModRevision = 
int64(math.Max(float64(resourceSchema.ParseMaxModRevision(s.indexRules)), 
float64(resourceSchema.ParseMaxModRevision(s.topNAggregations))))
        if s.schema.Interval != "" {
                s.interval, err = timestamp.ParseDuration(s.schema.Interval)
diff --git a/banyand/measure/measure_suite_test.go 
b/banyand/measure/measure_suite_test.go
index 704720d2..a1c829bb 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -25,9 +25,6 @@ import (
        "github.com/onsi/gomega"
        "go.uber.org/mock/gomock"
 
-       "github.com/apache/skywalking-banyandb/api/event"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -64,26 +61,15 @@ func (p *preloadMeasureService) PreRun() error {
 type services struct {
        measure         measure.Service
        metadataService metadata.Service
-       repo            *discovery.MockServiceRepo
        pipeline        queue.Queue
 }
 
 func setUp() (*services, func()) {
        ctrl := gomock.NewController(ginkgo.GinkgoT())
        gomega.Expect(ctrl).ShouldNot(gomega.BeNil())
-       // Init Discovery
-       repo := discovery.NewMockServiceRepo(ctrl)
-       repo.EXPECT().NodeID().AnyTimes()
-       repo.EXPECT().Name().AnyTimes()
-       stopCh := make(chan struct{})
-       repo.EXPECT().Serve().Return(stopCh).Times(1)
-       repo.EXPECT().GracefulStop().Do(func() { close(stopCh) }).Times(1)
-       // Both PreRun and Serve phases send events
-       repo.EXPECT().Publish(event.MeasureTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
-       repo.EXPECT().Publish(event.MeasureTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
 
        // Init Pipeline
-       pipeline, err := queue.NewQueue(context.TODO(), repo)
+       pipeline, err := queue.NewQueue(context.TODO())
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
        // Init Metadata Service
@@ -91,7 +77,7 @@ func setUp() (*services, func()) {
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
        // Init Measure Service
-       measureService, err := measure.NewService(context.TODO(), 
metadataService, repo, pipeline)
+       measureService, err := measure.NewService(context.TODO(), 
metadataService, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
        var flags []string
@@ -106,7 +92,6 @@ func setUp() (*services, func()) {
        flags = append(flags, "--etcd-listen-client-url="+listenClientURL, 
"--etcd-listen-peer-url="+listenPeerURL)
        moduleDeferFunc := test.SetupModules(
                flags,
-               repo,
                pipeline,
                metadataService,
                preloadMeasureSvc,
@@ -115,7 +100,6 @@ func setUp() (*services, func()) {
        return &services{
                        measure:         measureService,
                        metadataService: metadataService,
-                       repo:            repo,
                        pipeline:        pipeline,
                }, func() {
                        moduleDeferFunc()
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2a577b26..a86bfcc4 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -28,10 +28,8 @@ import (
        "google.golang.org/protobuf/testing/protocmp"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       "github.com/apache/skywalking-banyandb/api/event"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/queue"
@@ -49,7 +47,7 @@ type schemaRepo struct {
        metadata metadata.Repo
 }
 
-func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRepo,
+func newSchemaRepo(path string, metadata metadata.Repo,
        dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue, 
encoderBufferSize, bufferSize int64,
 ) schemaRepo {
        return schemaRepo{
@@ -57,11 +55,8 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRe
                metadata: metadata,
                Repository: resourceSchema.NewRepository(
                        metadata,
-                       repo,
                        l,
                        newSupplier(path, metadata, dbOpts, l, pipeline, 
encoderBufferSize, bufferSize),
-                       event.MeasureTopicShardEvent,
-                       event.MeasureTopicEntityEvent,
                ),
        }
 }
diff --git a/banyand/measure/metadata_test.go b/banyand/measure/metadata_test.go
index fdbada2f..9504c1f9 100644
--- a/banyand/measure/metadata_test.go
+++ b/banyand/measure/metadata_test.go
@@ -24,11 +24,9 @@ import (
        . "github.com/onsi/gomega"
        "github.com/onsi/gomega/gleak"
 
-       "github.com/apache/skywalking-banyandb/api/event"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
-       "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
@@ -54,7 +52,6 @@ var _ = Describe("Metadata", func() {
                        }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
                })
                It("should close the group", func() {
-                       
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
                        deleted, err := 
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "sw_metric")
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(deleted).Should(BeTrue())
@@ -65,8 +62,6 @@ var _ = Describe("Metadata", func() {
                })
 
                It("should add shards", func() {
-                       
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).AnyTimes()
-                       
svcs.repo.EXPECT().Publish(event.MeasureTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
                        groupSchema, err := 
svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "sw_metric")
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(groupSchema).ShouldNot(BeNil())
@@ -95,7 +90,6 @@ var _ = Describe("Metadata", func() {
                        }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
                })
                It("should close the measure", func() {
-                       
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1)
                        deleted, err := 
svcs.metadataService.MeasureRegistry().DeleteMeasure(context.TODO(), 
&commonv1.Metadata{
                                Name:  "service_cpm_minute",
                                Group: "sw_metric",
@@ -126,7 +120,6 @@ var _ = Describe("Metadata", func() {
                        })
 
                        It("should update a new measure", func() {
-                               
svcs.repo.EXPECT().Publish(event.MeasureTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).AnyTimes()
                                // Remove the first tag from the entity
                                measureSchema.Entity.TagNames = 
measureSchema.Entity.TagNames[1:]
                                entitySize := len(measureSchema.Entity.TagNames)
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 04c1b9ce..6e46f841 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -28,7 +28,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -61,9 +60,7 @@ type service struct {
        writeListener          bus.MessageListener
        metadata               metadata.Repo
        pipeline               queue.Queue
-       repo                   discovery.ServiceRepo
        l                      *logger.Logger
-       stopCh                 chan struct{}
        root                   string
        dbOpts                 tsdb.DatabaseOpts
        BlockEncoderBufferSize run.Bytes
@@ -116,7 +113,7 @@ func (s *service) PreRun() error {
        }
        path := path.Join(s.root, s.Name())
        observability.UpdatePath(path)
-       s.schemaRepo = newSchemaRepo(path, s.metadata, s.repo, s.dbOpts,
+       s.schemaRepo = newSchemaRepo(path, s.metadata, s.dbOpts,
                s.l, s.pipeline, int64(s.BlockEncoderBufferSize), 
int64(s.BlockBufferSize))
        for _, g := range groups {
                if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
@@ -144,6 +141,11 @@ func (s *service) PreRun() error {
                        }
                }
        }
+       // run a serial watcher
+       go s.schemaRepo.Watcher()
+       s.metadata.
+               
RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
+                       &s.schemaRepo)
 
        s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
        err = s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener)
@@ -177,31 +179,18 @@ func (s *service) sanityCheck(group resourceSchema.Group, 
measureSchema *databas
 }
 
 func (s *service) Serve() run.StopNotify {
-       _ = s.schemaRepo.NotifyAll()
-       // run a serial watcher
-       go s.schemaRepo.Watcher()
-
-       s.metadata.MeasureRegistry().
-               
RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
-                       &s.schemaRepo)
-
-       return s.stopCh
+       return s.schemaRepo.StopCh()
 }
 
 func (s *service) GracefulStop() {
        s.schemaRepo.Close()
-       if s.stopCh != nil {
-               close(s.stopCh)
-       }
 }
 
 // NewService returns a new service.
-func NewService(_ context.Context, metadata metadata.Repo, repo 
discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
+func NewService(_ context.Context, metadata metadata.Repo, pipeline 
queue.Queue) (Service, error) {
        return &service{
                metadata: metadata,
-               repo:     repo,
                pipeline: pipeline,
-               stopCh:   make(chan struct{}),
                dbOpts: tsdb.DatabaseOpts{
                        IndexGranularity: tsdb.IndexGranularitySeries,
                },
diff --git a/banyand/metadata/allocator.go b/banyand/metadata/allocator.go
new file mode 100644
index 00000000..acf52dbf
--- /dev/null
+++ b/banyand/metadata/allocator.go
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metadata
+
+import (
+       "context"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var _ schema.EventHandler = (*allocator)(nil)
+
+type allocator struct {
+       schemaRegistry schema.Registry
+       l              *logger.Logger
+}
+
+func newAllocator(schemaRegistry schema.Registry, logger *logger.Logger) 
*allocator {
+       return &allocator{
+               schemaRegistry: schemaRegistry,
+               l:              logger,
+       }
+}
+
+// OnAddOrUpdate implements EventHandler.
+func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) {
+       switch metadata.Kind {
+       case schema.KindGroup:
+               groupSchema := metadata.Spec.(*commonv1.Group)
+               if groupSchema.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED {
+                       return
+               }
+               now := time.Now()
+               nowPb := timestamppb.New(now)
+               shardNum := groupSchema.GetResourceOpts().GetShardNum()
+               syncShard := func(id uint64) error {
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                       defer cancel()
+                       return a.schemaRegistry.CreateOrUpdateShard(ctx, 
&databasev1.Shard{
+                               Id:    id,
+                               Total: shardNum,
+                               Metadata: &commonv1.Metadata{
+                                       Name: 
groupSchema.GetMetadata().GetName(),
+                               },
+                               Node: &databasev1.Node{
+                                       Id:        "TODO",
+                                       CreatedAt: nowPb,
+                                       UpdatedAt: nowPb,
+                                       Addr:      "TODO",
+                               },
+                       })
+               }
+               for i := 0; i < int(shardNum); i++ {
+                       if err := syncShard(uint64(i)); err != nil {
+                               // TODO: handle error. retry? or do a full sync?
+                               a.l.Error().Err(err).Msg("failed to sync shard")
+                       }
+               }
+       case schema.KindNode:
+               // TODO: handle node
+       default:
+               return
+       }
+}
+
+// OnDelete implements EventHandler.
+func (*allocator) OnDelete(schema.Metadata) {
+       // TODO: handle delete
+}
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index cfcfec84..0fc967c7 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -27,6 +27,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -39,10 +40,15 @@ func NewClient(_ context.Context) (Service, error) {
 
 type clientService struct {
        schemaRegistry schema.Registry
+       alc            *allocator
        closer         *run.Closer
        endpoints      []string
 }
 
+func (s *clientService) SchemaRegistry() schema.Registry {
+       return s.schemaRegistry
+}
+
 func (s *clientService) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("metadata")
        fs.StringArrayVar(&s.endpoints, flagEtcdEndpointsName, 
[]string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints")
@@ -62,6 +68,9 @@ func (s *clientService) PreRun() error {
        if err != nil {
                return err
        }
+
+       s.alc = newAllocator(s.schemaRegistry, 
logger.GetLogger(s.Name()).Named("allocator"))
+       s.schemaRegistry.RegisterHandler(schema.KindGroup|schema.KindNode, 
s.alc)
        return nil
 }
 
@@ -75,8 +84,8 @@ func (s *clientService) GracefulStop() {
        _ = s.schemaRegistry.Close()
 }
 
-func (s *clientService) SchemaRegistry() schema.Registry {
-       return s.schemaRegistry
+func (s *clientService) RegisterHandler(kind schema.Kind, handler 
schema.EventHandler) {
+       s.schemaRegistry.RegisterHandler(kind, handler)
 }
 
 func (s *clientService) StreamRegistry() schema.Stream {
@@ -107,6 +116,10 @@ func (s *clientService) PropertyRegistry() schema.Property 
{
        return s.schemaRegistry
 }
 
+func (s *clientService) ShardRegistry() schema.Shard {
+       return s.schemaRegistry
+}
+
 func (s *clientService) Name() string {
        return "metadata"
 }
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index f9ab33d7..9bad1e72 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -46,6 +46,8 @@ type Repo interface {
        GroupRegistry() schema.Group
        TopNAggregationRegistry() schema.TopNAggregation
        PropertyRegistry() schema.Property
+       ShardRegistry() schema.Shard
+       RegisterHandler(schema.Kind, schema.EventHandler)
 }
 
 // Service is the metadata repository.
diff --git a/banyand/metadata/schema/checker.go 
b/banyand/metadata/schema/checker.go
index 4a1a8751..3d669cbc 100644
--- a/banyand/metadata/schema/checker.go
+++ b/banyand/metadata/schema/checker.go
@@ -82,6 +82,20 @@ var checkerMap = map[Kind]equalityChecker{
                        protocmp.IgnoreFields(&commonv1.Metadata{}, "id", 
"create_revision", "mod_revision"),
                        protocmp.Transform())
        },
+       KindNode: func(a, b proto.Message) bool {
+               return cmp.Equal(a, b,
+                       protocmp.IgnoreUnknown(),
+                       protocmp.IgnoreFields(&databasev1.Node{}, "updated_at"),
+                       protocmp.IgnoreFields(&commonv1.Metadata{}, "id", 
"create_revision", "mod_revision"),
+                       protocmp.Transform())
+       },
+       KindShard: func(a, b proto.Message) bool {
+               return cmp.Equal(a, b,
+                       protocmp.IgnoreUnknown(),
+                       protocmp.IgnoreFields(&databasev1.Shard{}, 
"updated_at"),
+                       protocmp.IgnoreFields(&commonv1.Metadata{}, "id", 
"create_revision", "mod_revision"),
+                       protocmp.Transform())
+       },
        KindMask: func(a, b proto.Message) bool {
                return false
        },
diff --git a/banyand/metadata/schema/node.go b/banyand/metadata/schema/node.go
new file mode 100644
index 00000000..0c6dcdb1
--- /dev/null
+++ b/banyand/metadata/schema/node.go
@@ -0,0 +1,53 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       "context"
+
+       "google.golang.org/protobuf/proto"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var nodeKeyPrefix = "/nodes/"
+
+func (e *etcdSchemaRegistry) ListNode(ctx context.Context, role Role) 
([]*databasev1.Node, error) {
+       if role == "" {
+               return nil, BadRequest("group", "group should not be empty")
+       }
+       messages, err := e.listWithPrefix(ctx, 
listPrefixesForEntity(string(role), nodeKeyPrefix), func() proto.Message {
+               return &databasev1.Node{}
+       })
+       if err != nil {
+               return nil, err
+       }
+       entities := make([]*databasev1.Node, 0, len(messages))
+       for _, message := range messages {
+               entities = append(entities, message.(*databasev1.Node))
+       }
+       return entities, nil
+}
+
+func formatNodePrefix(role Role) string {
+       return nodeKeyPrefix + string(role)
+}
+
+func formatNodeKey(role Role, id string) string {
+       return formatNodePrefix(role) + "/" + id
+}
diff --git a/banyand/metadata/schema/schema.go 
b/banyand/metadata/schema/schema.go
index f9614938..fd97a1d6 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -50,7 +50,11 @@ const (
        KindIndexRule
        KindTopNAggregation
        KindProperty
-       KindMask = KindGroup | KindStream | KindMeasure | KindIndexRuleBinding 
| KindIndexRule | KindTopNAggregation
+       KindNode
+       KindShard
+       KindMask = KindGroup | KindStream | KindMeasure |
+               KindIndexRuleBinding | KindIndexRule |
+               KindTopNAggregation | KindProperty | KindNode | KindShard
 )
 
 // ListOpt contains options to list resources.
@@ -68,6 +72,9 @@ type Registry interface {
        Group
        TopNAggregation
        Property
+       Node
+       Shard
+       RegisterHandler(Kind, EventHandler)
 }
 
 // TypeMeta defines the identity and type of an Event.
@@ -103,6 +110,10 @@ func (tm TypeMeta) Unmarshal(data []byte) (m 
proto.Message, err error) {
                m = &propertyv1.Property{}
        case KindTopNAggregation:
                m = &databasev1.TopNAggregation{}
+       case KindNode:
+               m = &databasev1.Node{}
+       case KindShard:
+               m = &databasev1.Shard{}
        default:
                return nil, errUnsupportedEntityType
        }
@@ -144,6 +155,17 @@ func (m Metadata) key() (string, error) {
                        Group: m.Group,
                        Name:  m.Name,
                }), nil
+       case KindNode:
+               r, err := strToRole(m.Group)
+               if err != nil {
+                       return "", err
+               }
+               return formatNodeKey(r, m.Name), nil
+       case KindShard:
+               return formatShardKey(&commonv1.Metadata{
+                       Group: m.Group,
+                       Name:  m.Name,
+               }), nil
        default:
                return "", errUnsupportedEntityType
        }
@@ -168,7 +190,6 @@ type Stream interface {
        CreateStream(ctx context.Context, stream *databasev1.Stream) error
        UpdateStream(ctx context.Context, stream *databasev1.Stream) error
        DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, 
error)
-       RegisterHandler(Kind, EventHandler)
 }
 
 // IndexRule allows CRUD index rule schemas in a group.
@@ -196,7 +217,6 @@ type Measure interface {
        CreateMeasure(ctx context.Context, measure *databasev1.Measure) error
        UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error
        DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, 
error)
-       RegisterHandler(Kind, EventHandler)
        TopNAggregations(ctx context.Context, metadata *commonv1.Metadata) 
([]*databasev1.TopNAggregation, error)
 }
 
@@ -226,3 +246,37 @@ type Property interface {
        ApplyProperty(ctx context.Context, property *propertyv1.Property, 
strategy propertyv1.ApplyRequest_Strategy) (bool, uint32, error)
        DeleteProperty(ctx context.Context, metadata *propertyv1.Metadata, tags 
[]string) (bool, uint32, error)
 }
+
+// Role is the role of node.
+type Role string
+
+const (
+       // RoleMeta is the role of meta node.
+       RoleMeta = "meta"
+       // RoleData is the role of data node.
+       RoleData = "data"
+       // RoleQuery is the role of query node.
+       RoleQuery = "query"
+       // RoleLiaison is the role of liaison node.
+       RoleLiaison = "liaison"
+)
+
+func strToRole(role string) (Role, error) {
+       switch role {
+       case RoleMeta, RoleData, RoleQuery, RoleLiaison:
+               return Role(role), nil
+       default:
+               return "", errors.New("invalid role")
+       }
+}
+
+// Node allows CRUD node schemas in a group.
+type Node interface {
+       ListNode(ctx context.Context, role Role) ([]*databasev1.Node, error)
+}
+
+// Shard allows CRUD shard schemas in a group.
+type Shard interface {
+       CreateOrUpdateShard(ctx context.Context, shard *databasev1.Shard) error
+       ListShard(ctx context.Context, opt ListOpt) ([]*databasev1.Shard, error)
+}
diff --git a/banyand/metadata/schema/shard.go b/banyand/metadata/schema/shard.go
new file mode 100644
index 00000000..caf8b9de
--- /dev/null
+++ b/banyand/metadata/schema/shard.go
@@ -0,0 +1,76 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       "context"
+
+       "github.com/pkg/errors"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var shardKeyPrefix = "/shards/"
+
+func (e *etcdSchemaRegistry) CreateOrUpdateShard(ctx context.Context, shard 
*databasev1.Shard) error {
+       if shard.UpdatedAt != nil {
+               shard.UpdatedAt = timestamppb.Now()
+       }
+       md := Metadata{
+               TypeMeta: TypeMeta{
+                       Kind:  KindShard,
+                       Group: shard.GetMetadata().GetGroup(),
+                       Name:  shard.GetMetadata().GetName(),
+               },
+               Spec: shard,
+       }
+       err := e.update(ctx, md)
+       if err == nil {
+               return nil
+       }
+       if errors.Is(err, ErrGRPCResourceNotFound) {
+               shard.CreatedAt = shard.UpdatedAt
+               md.Spec = shard
+               return e.create(ctx, md)
+       }
+       return err
+}
+
+func (e *etcdSchemaRegistry) ListShard(ctx context.Context, opt ListOpt) 
([]*databasev1.Shard, error) {
+       if opt.Group == "" {
+               return nil, BadRequest("group", "group should not be empty")
+       }
+       messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, 
shardKeyPrefix), func() proto.Message {
+               return &databasev1.Shard{}
+       })
+       if err != nil {
+               return nil, err
+       }
+       entities := make([]*databasev1.Shard, 0, len(messages))
+       for _, message := range messages {
+               entities = append(entities, message.(*databasev1.Shard))
+       }
+       return entities, nil
+}
+
+func formatShardKey(metadata *commonv1.Metadata) string {
+       return formatKey(shardKeyPrefix, metadata)
+}
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index ec4a95e1..14a8632b 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -19,7 +19,6 @@
 package queue
 
 import (
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -31,7 +30,6 @@ var (
 
 type local struct {
        local  *bus.Bus
-       repo   discovery.ServiceRepo
        stopCh chan struct{}
 }
 
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 0946ecb1..e4bc0b33 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -20,7 +20,6 @@ package queue
 import (
        "context"
 
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -35,10 +34,9 @@ type Queue interface {
        run.Service
 }
 
-// NewQueue return a new Queue which relies on the discovery service.
-func NewQueue(_ context.Context, repo discovery.ServiceRepo) (Queue, error) {
+// NewQueue return a new Queue.
+func NewQueue(_ context.Context) (Queue, error) {
        return &local{
-               repo:   repo,
                local:  bus.NewBus(),
                stopCh: make(chan struct{}),
        }, nil
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 28e78da8..d69710b1 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -23,10 +23,8 @@ import (
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       "github.com/apache/skywalking-banyandb/api/event"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -41,7 +39,7 @@ type schemaRepo struct {
        metadata metadata.Repo
 }
 
-func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRepo,
+func newSchemaRepo(path string, metadata metadata.Repo,
        bufferSize int64, dbOpts tsdb.DatabaseOpts, l *logger.Logger,
 ) schemaRepo {
        return schemaRepo{
@@ -49,11 +47,8 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRe
                metadata: metadata,
                Repository: resourceSchema.NewRepository(
                        metadata,
-                       repo,
                        l,
                        newSupplier(path, metadata, bufferSize, dbOpts, l),
-                       event.StreamTopicShardEvent,
-                       event.StreamTopicEntityEvent,
                ),
        }
 }
diff --git a/banyand/stream/metadata_test.go b/banyand/stream/metadata_test.go
index 3377d556..dc184d40 100644
--- a/banyand/stream/metadata_test.go
+++ b/banyand/stream/metadata_test.go
@@ -24,10 +24,8 @@ import (
        . "github.com/onsi/gomega"
        "github.com/onsi/gomega/gleak"
 
-       "github.com/apache/skywalking-banyandb/api/event"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
@@ -54,7 +52,6 @@ var _ = Describe("Metadata", func() {
                        }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
                })
                It("should close the group", func() {
-                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
                        deleted, err := 
svcs.metadataService.GroupRegistry().DeleteGroup(context.TODO(), "default")
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(deleted).Should(BeTrue())
@@ -65,8 +62,6 @@ var _ = Describe("Metadata", func() {
                })
 
                It("should add shards", func() {
-                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_DELETE)).Times(2)
-                       svcs.repo.EXPECT().Publish(event.StreamTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(4)
                        groupSchema, err := 
svcs.metadataService.GroupRegistry().GetGroup(context.TODO(), "default")
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(groupSchema).ShouldNot(BeNil())
@@ -95,7 +90,6 @@ var _ = Describe("Metadata", func() {
                        }).WithTimeout(flags.EventuallyTimeout).Should(BeTrue())
                })
                It("should close the stream", func() {
-                       
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_DELETE)).Times(1)
                        deleted, err := 
svcs.metadataService.StreamRegistry().DeleteStream(context.TODO(), 
&commonv1.Metadata{
                                Name:  "sw",
                                Group: "default",
@@ -126,7 +120,6 @@ var _ = Describe("Metadata", func() {
                        })
 
                        It("should update a new stream", func() {
-                               
svcs.repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(1)
                                // Remove the first tag from the entity
                                streamSchema.Entity.TagNames = 
streamSchema.Entity.TagNames[1:]
                                entitySize := len(streamSchema.Entity.TagNames)
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index d395e9ca..07749f5b 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -26,7 +26,6 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -55,10 +54,8 @@ type service struct {
        schemaRepo      schemaRepo
        metadata        metadata.Repo
        pipeline        queue.Queue
-       repo            discovery.ServiceRepo
        writeListener   *writeCallback
        l               *logger.Logger
-       stopCh          chan struct{}
        root            string
        dbOpts          tsdb.DatabaseOpts
        blockBufferSize run.Bytes
@@ -106,7 +103,7 @@ func (s *service) PreRun() error {
        }
        path := path.Join(s.root, s.Name())
        observability.UpdatePath(path)
-       s.schemaRepo = newSchemaRepo(path, s.metadata, s.repo, 
int64(s.blockBufferSize), s.dbOpts, s.l)
+       s.schemaRepo = newSchemaRepo(path, s.metadata, 
int64(s.blockBufferSize), s.dbOpts, s.l)
        for _, g := range groups {
                if g.Catalog != commonv1.Catalog_CATALOG_STREAM {
                        continue
@@ -127,6 +124,10 @@ func (s *service) PreRun() error {
                        }
                }
        }
+       // run a serial watcher
+       s.schemaRepo.Watcher()
+       
s.metadata.RegisterHandler(schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule,
+               &s.schemaRepo)
 
        s.writeListener = setUpWriteCallback(s.l, &s.schemaRepo)
 
@@ -138,32 +139,21 @@ func (s *service) PreRun() error {
 }
 
 func (s *service) Serve() run.StopNotify {
-       _ = s.schemaRepo.NotifyAll()
-       // run a serial watcher
-       s.schemaRepo.Watcher()
-
-       
s.metadata.StreamRegistry().RegisterHandler(schema.KindGroup|schema.KindStream|schema.KindIndexRuleBinding|schema.KindIndexRule,
-               &s.schemaRepo)
-       return s.stopCh
+       return s.schemaRepo.StopCh()
 }
 
 func (s *service) GracefulStop() {
        s.schemaRepo.Close()
-       if s.stopCh != nil {
-               close(s.stopCh)
-       }
 }
 
 // NewService returns a new service.
-func NewService(_ context.Context, metadata metadata.Repo, repo 
discovery.ServiceRepo, pipeline queue.Queue) (Service, error) {
+func NewService(_ context.Context, metadata metadata.Repo, pipeline 
queue.Queue) (Service, error) {
        return &service{
                metadata: metadata,
-               repo:     repo,
                pipeline: pipeline,
                dbOpts: tsdb.DatabaseOpts{
                        EnableGlobalIndex: true,
                        IndexGranularity:  tsdb.IndexGranularityBlock,
                },
-               stopCh: make(chan struct{}),
        }, nil
 }
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index ded686fe..fb60bd64 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -27,7 +27,6 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/banyand/tsdb/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/partition"
        "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
@@ -43,7 +42,6 @@ type stream struct {
        indexWriter            *index.Writer
        name                   string
        group                  string
-       entityLocator          partition.EntityLocator
        indexRules             []*databasev1.IndexRule
        maxObservedModRevision int64
        shardNum               uint32
@@ -69,17 +67,12 @@ func (s *stream) GetTopN() []*databasev1.TopNAggregation {
        return nil
 }
 
-func (s *stream) EntityLocator() partition.EntityLocator {
-       return s.entityLocator
-}
-
 func (s *stream) Close() error {
        return nil
 }
 
 func (s *stream) parseSpec() {
        s.name, s.group = s.schema.GetMetadata().GetName(), 
s.schema.GetMetadata().GetGroup()
-       s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), 
s.schema.GetEntity())
        s.maxObservedModRevision = schema.ParseMaxModRevision(s.indexRules)
 }
 
diff --git a/banyand/stream/stream_suite_test.go 
b/banyand/stream/stream_suite_test.go
index e89987f2..fa5fa2c3 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -25,9 +25,6 @@ import (
        . "github.com/onsi/gomega"
        "go.uber.org/mock/gomock"
 
-       "github.com/apache/skywalking-banyandb/api/event"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -63,25 +60,13 @@ func (p *preloadStreamService) PreRun() error {
 type services struct {
        stream          *service
        metadataService metadata.Service
-       repo            *discovery.MockServiceRepo
 }
 
 func setUp() (*services, func()) {
        ctrl := gomock.NewController(GinkgoT())
        Expect(ctrl).ShouldNot(BeNil())
-       // Init Discovery
-       repo := discovery.NewMockServiceRepo(ctrl)
-       repo.EXPECT().NodeID().AnyTimes()
-       repo.EXPECT().Name().AnyTimes()
-       stopCh := make(chan struct{})
-       repo.EXPECT().Serve().Return(stopCh).Times(1)
-       repo.EXPECT().GracefulStop().Do(func() { close(stopCh) }).Times(1)
-       // Both PreRun and Serve phases send events
-       repo.EXPECT().Publish(event.StreamTopicEntityEvent, 
test.NewEntityEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 1)
-       repo.EXPECT().Publish(event.StreamTopicShardEvent, 
test.NewShardEventMatcher(databasev1.Action_ACTION_PUT)).Times(2 * 2)
-
        // Init Pipeline
-       pipeline, err := queue.NewQueue(context.TODO(), repo)
+       pipeline, err := queue.NewQueue(context.TODO())
        Expect(err).NotTo(HaveOccurred())
 
        // Init Metadata Service
@@ -90,7 +75,7 @@ func setUp() (*services, func()) {
        Expect(err).NotTo(HaveOccurred())
 
        // Init Stream Service
-       streamService, err := NewService(context.TODO(), metadataService, repo, 
pipeline)
+       streamService, err := NewService(context.TODO(), metadataService, 
pipeline)
        Expect(err).NotTo(HaveOccurred())
        preloadStreamSvc := &preloadStreamService{metaSvc: metadataService}
        var flags []string
@@ -105,7 +90,6 @@ func setUp() (*services, func()) {
        flags = append(flags, "--etcd-listen-client-url="+listenClientURL, 
"--etcd-listen-peer-url="+listenPeerURL)
        moduleDeferFunc := test.SetupModules(
                flags,
-               repo,
                pipeline,
                metadataService,
                preloadStreamSvc,
@@ -114,7 +98,6 @@ func setUp() (*services, func()) {
        return &services{
                        stream:          streamService.(*service),
                        metadataService: metadataService,
-                       repo:            repo,
                }, func() {
                        moduleDeferFunc()
                        metaDeferFunc()
diff --git a/dist/LICENSE b/dist/LICENSE
index 55639375..87b47d39 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -191,7 +191,7 @@ Apache-2.0 licenses
     github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da Apache-2.0
     github.com/google/btree v1.1.2 Apache-2.0
     github.com/google/flatbuffers v1.12.1 Apache-2.0
-    github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 Apache-2.0
+    github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6 Apache-2.0
     github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 Apache-2.0
     github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus 
v1.0.0-rc.0 Apache-2.0
     github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.5 Apache-2.0
@@ -287,7 +287,7 @@ BSD-3-Clause licenses
     golang.org/x/sys v0.10.0 BSD-3-Clause
     golang.org/x/text v0.11.0 BSD-3-Clause
     golang.org/x/time v0.3.0 BSD-3-Clause
-    golang.org/x/tools v0.11.0 BSD-3-Clause
+    golang.org/x/tools v0.11.1 BSD-3-Clause
     google.golang.org/protobuf v1.31.0 BSD-3-Clause
 
 ========================================================================
@@ -326,7 +326,7 @@ MIT licenses
     github.com/mattn/go-isatty v0.0.19 MIT
     github.com/mitchellh/mapstructure v1.5.0 MIT
     github.com/onsi/ginkgo/v2 v2.11.0 MIT
-    github.com/onsi/gomega v1.27.8 MIT
+    github.com/onsi/gomega v1.27.10 MIT
     github.com/pelletier/go-toml/v2 v2.0.8 MIT
     github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c MIT
     github.com/robfig/cron/v3 v3.0.1 MIT
diff --git a/docs/api-reference.md b/docs/api-reference.md
index e587be94..96931a18 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -341,6 +341,7 @@ Metadata is for multi-tenant, multi-model use
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
 | id | [string](#string) |  |  |
+| metadata | [banyandb.common.v1.Metadata](#banyandb-common-v1-Metadata) |  |  
|
 | addr | [string](#string) |  |  |
 | updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
 | created_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
diff --git a/go.mod b/go.mod
index 5eea559e..7cf87065 100644
--- a/go.mod
+++ b/go.mod
@@ -21,7 +21,7 @@ require (
        github.com/hashicorp/golang-lru/v2 v2.0.4
        github.com/oklog/run v1.1.0
        github.com/onsi/ginkgo/v2 v2.11.0
-       github.com/onsi/gomega v1.27.8
+       github.com/onsi/gomega v1.27.10
        github.com/pkg/errors v0.9.1
        github.com/prometheus/client_golang v1.16.0
        github.com/rs/zerolog v1.29.1
@@ -77,7 +77,7 @@ require (
        github.com/golang/snappy v0.0.3
        github.com/google/btree v1.1.2 // indirect
        github.com/google/flatbuffers v1.12.1 // indirect
-       github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
+       github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6 // indirect
        github.com/gorilla/websocket v1.5.0 // indirect
        github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
        github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
@@ -140,7 +140,7 @@ require (
        golang.org/x/sys v0.10.0 // indirect
        golang.org/x/text v0.11.0 // indirect
        golang.org/x/time v0.3.0 // indirect
-       golang.org/x/tools v0.11.0 // indirect
+       golang.org/x/tools v0.11.1 // indirect
        google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // 
indirect
        gopkg.in/ini.v1 v1.67.0 // indirect
        gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
diff --git a/go.sum b/go.sum
index cd1e625b..eb7ac7e6 100644
--- a/go.sum
+++ b/go.sum
@@ -222,8 +222,8 @@ github.com/google/pprof 
v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf
 github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod 
h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod 
h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod 
h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
-github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 
h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA=
-github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod 
h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
+github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6 
h1:ZgoomqkdjGbQ3+qQXCkvYMCDvGDNg2k5JJDjjdTB6jY=
+github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6/go.mod 
h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
 github.com/google/renameio v0.1.0/go.mod 
h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/google/uuid v1.1.2/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -311,8 +311,8 @@ github.com/oklog/run v1.1.0 
h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
 github.com/oklog/run v1.1.0/go.mod 
h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
 github.com/onsi/ginkgo/v2 v2.11.0 
h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
 github.com/onsi/ginkgo/v2 v2.11.0/go.mod 
h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
-github.com/onsi/gomega v1.27.8 h1:gegWiwZjBsf2DgiSbf5hpokZ98JVDMcWkUiigk6/KXc=
-github.com/onsi/gomega v1.27.8/go.mod 
h1:2J8vzI/s+2shY9XHRApDkdgPo1TKT7P2u6fXeJKFnNQ=
+github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
+github.com/onsi/gomega v1.27.10/go.mod 
h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
 github.com/opentracing/opentracing-go v1.1.0/go.mod 
h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
 github.com/pelletier/go-toml v1.2.0/go.mod 
h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
 github.com/pelletier/go-toml/v2 v2.0.8 
h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
@@ -707,8 +707,8 @@ golang.org/x/tools 
v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4f
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod 
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod 
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.1.0/go.mod 
h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
-golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8=
-golang.org/x/tools v0.11.0/go.mod 
h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8=
+golang.org/x/tools v0.11.1 h1:ojD5zOW8+7dOGzdnNgersm8aPfcDjhMp12UfG93NIMc=
+golang.org/x/tools v0.11.1/go.mod 
h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 31f23f57..6d20cd30 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -26,20 +26,15 @@ import (
        "sync/atomic"
        "time"
 
-       "github.com/onsi/ginkgo/v2"
        "github.com/pkg/errors"
        "go.uber.org/multierr"
-       "google.golang.org/protobuf/types/known/timestamppb"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
-       "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/partition"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -95,7 +90,6 @@ type Resource interface {
        GetIndexRules() []*databasev1.IndexRule
        GetTopN() []*databasev1.TopNAggregation
        MaxObservedModRevision() int64
-       EntityLocator() partition.EntityLocator
        ResourceSchema
        io.Closer
 }
@@ -114,8 +108,8 @@ type Repository interface {
        StoreGroup(groupMeta *commonv1.Metadata) (*group, error)
        LoadGroup(name string) (Group, bool)
        LoadResource(metadata *commonv1.Metadata) (Resource, bool)
-       NotifyAll() (err error)
        Close()
+       StopCh() <-chan struct{}
 }
 
 const defaultWorkerNum = 10
@@ -124,38 +118,36 @@ var _ Repository = (*schemaRepo)(nil)
 
 type schemaRepo struct {
        metadata         metadata.Repo
-       repo             discovery.ServiceRepo
        resourceSupplier ResourceSupplier
        l                *logger.Logger
        data             map[string]*group
+       workerCloser     *run.Closer
        closer           *run.Closer
        eventCh          chan MetadataEvent
-       shardTopic       bus.Topic
-       entityTopic      bus.Topic
        workerNum        int
        sync.RWMutex
 }
 
+// StopCh implements Repository.
+func (sr *schemaRepo) StopCh() <-chan struct{} {
+       return sr.closer.CloseNotify()
+}
+
 // NewRepository return a new Repository.
 func NewRepository(
        metadata metadata.Repo,
-       repo discovery.ServiceRepo,
        l *logger.Logger,
        resourceSupplier ResourceSupplier,
-       shardTopic bus.Topic,
-       entityTopic bus.Topic,
 ) Repository {
        return &schemaRepo{
                metadata:         metadata,
-               repo:             repo,
                l:                l,
                resourceSupplier: resourceSupplier,
-               shardTopic:       shardTopic,
-               entityTopic:      entityTopic,
                data:             make(map[string]*group),
                eventCh:          make(chan MetadataEvent, defaultWorkerNum),
                workerNum:        defaultWorkerNum,
-               closer:           run.NewCloser(defaultWorkerNum),
+               workerCloser:     run.NewCloser(defaultWorkerNum),
+               closer:           run.NewCloser(1),
        }
 }
 
@@ -167,7 +159,7 @@ func (sr *schemaRepo) Watcher() {
        for i := 0; i < sr.workerNum; i++ {
                go func() {
                        defer func() {
-                               sr.closer.Done()
+                               sr.workerCloser.Done()
                                if err := recover(); err != nil {
                                        sr.l.Warn().Interface("err", 
err).Msg("watching the events")
                                }
@@ -202,11 +194,11 @@ func (sr *schemaRepo) Watcher() {
                                                
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. 
retry...")
                                                select {
                                                case sr.eventCh <- evt:
-                                               case <-sr.closer.CloseNotify():
+                                               case 
<-sr.workerCloser.CloseNotify():
                                                        return
                                                }
                                        }
-                               case <-sr.closer.CloseNotify():
+                               case <-sr.workerCloser.CloseNotify():
                                        return
                                }
                        }
@@ -232,9 +224,9 @@ func (sr *schemaRepo) StoreGroup(groupMeta 
*commonv1.Metadata) (*group, error) {
                if err != nil {
                        return nil, err
                }
-               g = newGroup(groupSchema, sr.repo, sr.metadata, db, sr.l, 
sr.resourceSupplier, sr.entityTopic)
+               g = newGroup(groupSchema, sr.metadata, db, sr.l, 
sr.resourceSupplier)
                sr.data[name] = g
-               return g, sr.notify(groupSchema, databasev1.Action_ACTION_PUT)
+               return g, nil
        }
        prevGroupSchema := g.GetSchema()
        if groupSchema.GetMetadata().GetModRevision() <= 
prevGroupSchema.Metadata.ModRevision {
@@ -243,20 +235,12 @@ func (sr *schemaRepo) StoreGroup(groupMeta 
*commonv1.Metadata) (*group, error) {
        sr.l.Info().Str("group", name).Msg("closing the previous tsdb")
        db := g.SupplyTSDB()
        db.Close()
-       err = sr.notify(prevGroupSchema, databasev1.Action_ACTION_DELETE)
-       if err != nil {
-               return nil, err
-       }
        sr.l.Info().Str("group", name).Msg("creating a new tsdb")
        newDB, err := sr.resourceSupplier.OpenDB(groupSchema)
        if err != nil {
                return nil, err
        }
        g.setDB(newDB)
-       err = sr.notify(groupSchema, databasev1.Action_ACTION_PUT)
-       if err != nil {
-               return nil, err
-       }
        g.groupSchema.Store(groupSchema)
        return g, nil
 }
@@ -274,7 +258,6 @@ func (sr *schemaRepo) deleteGroup(groupMeta 
*commonv1.Metadata) error {
        if err != nil {
                return err
        }
-       _ = sr.notify(g.GetSchema(), databasev1.Action_ACTION_DELETE)
        delete(sr.data, name)
        return nil
 }
@@ -324,61 +307,13 @@ func (sr *schemaRepo) deleteResource(metadata 
*commonv1.Metadata) error {
        return g.(*group).deleteResource(metadata)
 }
 
-func (sr *schemaRepo) notify(groupSchema *commonv1.Group, action 
databasev1.Action) (err error) {
-       now := time.Now()
-       nowPb := timestamppb.New(now)
-       shardNum := groupSchema.GetResourceOpts().GetShardNum()
-       for i := 0; i < int(shardNum); i++ {
-               _, errInternal := sr.repo.Publish(sr.shardTopic, 
bus.NewMessage(bus.MessageID(now.UnixNano()), &databasev1.ShardEvent{
-                       Shard: &databasev1.Shard{
-                               Id:    uint64(i),
-                               Total: shardNum,
-                               Metadata: &commonv1.Metadata{
-                                       Name: 
groupSchema.GetMetadata().GetName(),
-                               },
-                               Node: &databasev1.Node{
-                                       Id:        sr.repo.NodeID(),
-                                       CreatedAt: nowPb,
-                                       UpdatedAt: nowPb,
-                                       Addr:      "localhost",
-                               },
-                               UpdatedAt: nowPb,
-                               CreatedAt: nowPb,
-                       },
-                       Time:   nowPb,
-                       Action: action,
-               }))
-               if errors.Is(errInternal, bus.ErrTopicNotExist) {
-                       return nil
-               }
-               if errInternal != nil {
-                       err = multierr.Append(err, errInternal)
-               }
-       }
-       return err
-}
-
-func (sr *schemaRepo) NotifyAll() (err error) {
-       sr.RLock()
-       defer sr.RUnlock()
-       for _, g := range sr.data {
-               err = multierr.Append(err, sr.notify(g.GetSchema(), 
databasev1.Action_ACTION_PUT))
-               g.mapMutex.RLock()
-               for _, s := range g.schemaMap {
-                       err = multierr.Append(err, g.notify(s, 
databasev1.Action_ACTION_PUT))
-               }
-               g.mapMutex.RUnlock()
-       }
-       return err
-}
-
 func (sr *schemaRepo) Close() {
        defer func() {
                if err := recover(); err != nil {
                        sr.l.Warn().Interface("err", err).Msg("closing 
resource")
                }
        }()
-       sr.closer.CloseThenWait()
+       sr.workerCloser.CloseThenWait()
 
        sr.RLock()
        defer sr.RUnlock()
@@ -388,39 +323,35 @@ func (sr *schemaRepo) Close() {
                        sr.l.Err(err).RawJSON("group", 
logger.Proto(g.GetSchema().Metadata)).Msg("closing")
                }
        }
+       sr.closer.Done()
+       sr.closer.CloseThenWait()
 }
 
 var _ Group = (*group)(nil)
 
 type group struct {
        resourceSupplier ResourceSupplier
-       repo             discovery.ServiceRepo
        metadata         metadata.Repo
        db               atomic.Value
        groupSchema      atomic.Pointer[commonv1.Group]
        l                *logger.Logger
        schemaMap        map[string]Resource
-       entityTopic      bus.Topic
        mapMutex         sync.RWMutex
 }
 
 func newGroup(
        groupSchema *commonv1.Group,
-       repo discovery.ServiceRepo,
        metadata metadata.Repo,
        db tsdb.Database,
        l *logger.Logger,
        resourceSupplier ResourceSupplier,
-       entityTopic bus.Topic,
 ) *group {
        g := &group{
                groupSchema:      atomic.Pointer[commonv1.Group]{},
-               repo:             repo,
                metadata:         metadata,
                l:                l,
                schemaMap:        make(map[string]Resource),
                resourceSupplier: resourceSupplier,
-               entityTopic:      entityTopic,
        }
        g.groupSchema.Store(groupSchema)
        g.db.Store(db)
@@ -492,9 +423,6 @@ func (g *group) StoreResource(resourceSchema 
ResourceSchema) (Resource, error) {
        if errTS != nil {
                return nil, errTS
        }
-       if err := g.notify(sm, databasev1.Action_ACTION_PUT); err != nil {
-               return nil, err
-       }
        g.schemaMap[key] = sm
        if preResource != nil {
                _ = preResource.Close()
@@ -510,9 +438,6 @@ func (g *group) deleteResource(metadata *commonv1.Metadata) 
error {
        if preResource == nil {
                return nil
        }
-       if err := g.notify(preResource, databasev1.Action_ACTION_DELETE); err 
!= nil {
-               return err
-       }
        delete(g.schemaMap, key)
        _ = preResource.Close()
        return nil
@@ -528,30 +453,6 @@ func (g *group) LoadResource(name string) (Resource, bool) 
{
        return s, true
 }
 
-func (g *group) notify(resource Resource, action databasev1.Action) error {
-       defer ginkgo.GinkgoRecover()
-       now := time.Now()
-       nowPb := timestamppb.New(now)
-       entityLocator := resource.EntityLocator()
-       locator := make([]*databasev1.EntityEvent_TagLocator, 0, 
len(entityLocator))
-       for _, tagLocator := range entityLocator {
-               locator = append(locator, &databasev1.EntityEvent_TagLocator{
-                       FamilyOffset: uint32(tagLocator.FamilyOffset),
-                       TagOffset:    uint32(tagLocator.TagOffset),
-               })
-       }
-       _, err := g.repo.Publish(g.entityTopic, 
bus.NewMessage(bus.MessageID(now.UnixNano()), &databasev1.EntityEvent{
-               Subject:       resource.GetMetadata(),
-               EntityLocator: locator,
-               Time:          nowPb,
-               Action:        action,
-       }))
-       if errors.Is(err, bus.ErrTopicNotExist) {
-               return nil
-       }
-       return err
-}
-
 func (g *group) close() (err error) {
        g.mapMutex.RLock()
        for _, s := range g.schemaMap {
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 7b654471..18070973 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -24,7 +24,6 @@ import (
 
        "github.com/onsi/gomega"
 
-       "github.com/apache/skywalking-banyandb/banyand/discovery"
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/liaison/http"
        "github.com/apache/skywalking-banyandb/banyand/measure"
@@ -77,37 +76,37 @@ func CommonWithSchemaLoaders(schemaLoaders []SchemaLoader, 
flags ...string) (str
 }
 
 func modules(schemaLoaders []SchemaLoader, flags []string) func() {
-       // Init `Discovery` module
-       repo, err := discovery.NewServiceRepo(context.Background())
-       gomega.Expect(err).NotTo(gomega.HaveOccurred())
        // Init `Queue` module
-       pipeline, err := queue.NewQueue(context.TODO(), repo)
+       pipeline, err := queue.NewQueue(context.TODO())
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        // Init `Metadata` module
        metaSvc, err := metadata.NewService(context.TODO())
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        // Init `Stream` module
-       streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, 
pipeline)
+       streamSvc, err := stream.NewService(context.TODO(), metaSvc, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        // Init `Measure` module
-       measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, 
pipeline)
+       measureSvc, err := measure.NewService(context.TODO(), metaSvc, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        // Init `Query` module
        q, err := query.NewService(context.TODO(), streamSvc, measureSvc, 
metaSvc, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
+       tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc)
        httpServer := http.NewService()
 
        units := []run.Unit{
-               repo,
                pipeline,
                metaSvc,
+               streamSvc,
+               measureSvc,
+               q,
+               tcp,
+               httpServer,
        }
        for _, sl := range schemaLoaders {
                sl.SetMeta(metaSvc)
                units = append(units, sl)
        }
-       units = append(units, streamSvc, measureSvc, q, tcp, httpServer)
 
        return test.SetupModules(
                flags,


Reply via email to