This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch test
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit fe2687b975db70eeb7010e33b81ba00045c61829
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Sep 18 12:49:48 2023 +0000

    Refactor node selecting process
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/liaison/grpc/discovery.go     | 103 ++++++--------------------------
 banyand/liaison/grpc/measure.go       |  13 +++-
 banyand/liaison/grpc/node.go          | 109 ++++++++++++++++++++++++++++++++++
 banyand/liaison/grpc/registry_test.go |   2 +-
 banyand/liaison/grpc/server.go        |  10 ++--
 banyand/liaison/grpc/stream.go        |  11 +++-
 banyand/metadata/allocator.go         |  78 ------------------------
 banyand/metadata/client.go            |  15 +----
 banyand/metadata/metadata.go          |   1 -
 banyand/metadata/schema/checker.go    |   7 ---
 banyand/metadata/schema/kind.go       |   9 +--
 banyand/metadata/schema/schema.go     |  12 ----
 banyand/metadata/schema/shard.go      |  74 -----------------------
 banyand/queue/local.go                |   4 ++
 banyand/queue/pub/client.go           |   6 ++
 banyand/queue/pub/pub.go              |   8 ++-
 banyand/queue/queue.go                |   2 +
 pkg/cmdsetup/liaison.go               |   2 +-
 pkg/cmdsetup/standalone.go            |   2 +-
 19 files changed, 175 insertions(+), 293 deletions(-)

diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index 823183b1..b133d21d 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -18,10 +18,8 @@
 package grpc
 
 import (
-       "context"
        "fmt"
        "sync"
-       "time"
 
        "github.com/pkg/errors"
 
@@ -42,13 +40,14 @@ var errNotExist = errors.New("the object doesn't exist")
 type discoveryService struct {
        pipeline     queue.Client
        metadataRepo metadata.Repo
+       nodeRegistry NodeRegistry
        shardRepo    *shardRepo
        entityRepo   *entityRepo
        log          *logger.Logger
        kind         schema.Kind
 }
 
-func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo 
metadata.Repo) *discoveryService {
+func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo 
metadata.Repo, nodeRegistry NodeRegistry) *discoveryService {
        sr := &shardRepo{shardEventsMap: make(map[identity]uint32)}
        er := &entityRepo{entitiesMap: 
make(map[identity]partition.EntityLocator)}
        return &discoveryService{
@@ -57,80 +56,12 @@ func newDiscoveryService(pipeline queue.Client, kind 
schema.Kind, metadataRepo m
                pipeline:     pipeline,
                kind:         kind,
                metadataRepo: metadataRepo,
+               nodeRegistry: nodeRegistry,
        }
 }
 
-func (ds *discoveryService) initialize(ctx context.Context) error {
-       ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second)
-       groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctxLocal)
-       cancel()
-       if err != nil {
-               return err
-       }
-       for _, g := range groups {
-               switch ds.kind {
-               case schema.KindMeasure:
-               case schema.KindStream:
-               default:
-                       continue
-               }
-               ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second)
-               shards, innerErr := 
ds.metadataRepo.ShardRegistry().ListShard(ctxLocal, schema.ListOpt{Group: 
g.Metadata.Name})
-               cancel()
-               if innerErr != nil {
-                       return innerErr
-               }
-               for _, s := range shards {
-                       ds.shardRepo.OnAddOrUpdate(schema.Metadata{
-                               TypeMeta: schema.TypeMeta{
-                                       Kind:  schema.KindShard,
-                                       Name:  s.Metadata.Name,
-                                       Group: s.Metadata.Group,
-                               },
-                               Spec: s,
-                       })
-               }
-
-               switch ds.kind {
-               case schema.KindMeasure:
-                       ctxLocal, cancel = context.WithTimeout(ctx, 
5*time.Second)
-                       mm, innerErr := 
ds.metadataRepo.MeasureRegistry().ListMeasure(ctxLocal, schema.ListOpt{Group: 
g.Metadata.Name})
-                       cancel()
-                       if innerErr != nil {
-                               return innerErr
-                       }
-                       for _, m := range mm {
-                               ds.entityRepo.OnAddOrUpdate(schema.Metadata{
-                                       TypeMeta: schema.TypeMeta{
-                                               Kind:  schema.KindMeasure,
-                                               Name:  m.Metadata.Name,
-                                               Group: m.Metadata.Group,
-                                       },
-                                       Spec: m,
-                               })
-                       }
-               case schema.KindStream:
-                       ctxLocal, cancel = context.WithTimeout(ctx, 
5*time.Second)
-                       ss, innerErr := 
ds.metadataRepo.StreamRegistry().ListStream(ctxLocal, schema.ListOpt{Group: 
g.Metadata.Name})
-                       cancel()
-                       if innerErr != nil {
-                               return innerErr
-                       }
-                       for _, s := range ss {
-                               ds.entityRepo.OnAddOrUpdate(schema.Metadata{
-                                       TypeMeta: schema.TypeMeta{
-                                               Kind:  schema.KindStream,
-                                               Name:  s.Metadata.Name,
-                                               Group: s.Metadata.Group,
-                                       },
-                                       Spec: s,
-                               })
-                       }
-               default:
-                       return fmt.Errorf("unsupported kind: %d", ds.kind)
-               }
-       }
-       ds.metadataRepo.RegisterHandler("liaison", schema.KindShard, 
ds.shardRepo)
+func (ds *discoveryService) initialize() error {
+       ds.metadataRepo.RegisterHandler("liaison", schema.KindGroup, 
ds.shardRepo)
        ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.entityRepo)
        return nil
 }
@@ -172,28 +103,32 @@ type shardRepo struct {
        sync.RWMutex
 }
 
-// OnAddOrUpdate implements schema.EventHandler.
 func (s *shardRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) {
-       if schemaMetadata.Kind != schema.KindShard {
+       if schemaMetadata.Kind != schema.KindGroup {
                return
        }
-       shard := schemaMetadata.Spec.(*databasev1.Shard)
-       idx := getID(shard.GetMetadata())
+       group := schemaMetadata.Spec.(*commonv1.Group)
+       if group.ResourceOpts == nil || group.Catalog == 
commonv1.Catalog_CATALOG_UNSPECIFIED {
+               return
+       }
+       idx := getID(group.GetMetadata())
        if le := s.log.Debug(); le.Enabled() {
-               le.Stringer("id", idx).Uint32("total", shard.Total).Msg("shard 
added or updated")
+               le.Stringer("id", idx).Uint32("total", 
group.ResourceOpts.ShardNum).Msg("shard added or updated")
        }
        s.RWMutex.Lock()
        defer s.RWMutex.Unlock()
-       s.shardEventsMap[idx] = shard.Total
+       s.shardEventsMap[idx] = group.ResourceOpts.ShardNum
 }
 
-// OnDelete implements schema.EventHandler.
 func (s *shardRepo) OnDelete(schemaMetadata schema.Metadata) {
-       if schemaMetadata.Kind != schema.KindShard {
+       if schemaMetadata.Kind != schema.KindGroup {
+               return
+       }
+       group := schemaMetadata.Spec.(*commonv1.Group)
+       if group.ResourceOpts == nil || group.Catalog == 
commonv1.Catalog_CATALOG_UNSPECIFIED {
                return
        }
-       shard := schemaMetadata.Spec.(*databasev1.Shard)
-       idx := getID(shard.GetMetadata())
+       idx := getID(group.GetMetadata())
        if le := s.log.Debug(); le.Enabled() {
                le.Stringer("id", idx).Msg("shard deleted")
        }
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 90cf7632..4b3f7da9 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -113,11 +113,18 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                        SeriesHash:   tsdb.HashEntity(entity),
                        EntityValues: tagValues.Encode(),
                }
-               // TODO: set node id
-               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "todo", iwr)
+               nodeID, errPickNode := 
ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(), 
writeRequest.GetMetadata().GetName(), uint32(shardID))
+               if errPickNode != nil {
+                       ms.sampled.Error().Err(errPickNode).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to pick an available node")
+                       reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, 
ms.sampled)
+                       continue
+               }
+               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
                _, errWritePub := publisher.Publish(data.TopicMeasureWrite, 
message)
                if errWritePub != nil {
-                       ms.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to send a message")
+                       ms.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a 
message")
+                       reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, 
ms.sampled)
+                       continue
                }
                reply(nil, modelv1.Status_STATUS_SUCCEED, 
writeRequest.GetMessageId(), measure, ms.sampled)
        }
diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go
new file mode 100644
index 00000000..73529604
--- /dev/null
+++ b/banyand/liaison/grpc/node.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "sort"
+       "sync"
+
+       "github.com/pkg/errors"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+)
+
+var (
+       _ schema.EventHandler = (*clusterNodeService)(nil)
+       _ NodeRegistry        = (*clusterNodeService)(nil)
+)
+
+// NodeRegistry is for locating data node with group/name of the metadata
+// together with the shardID calculated from the incoming data.
+type NodeRegistry interface {
+       Locate(group, name string, shardID uint32) (string, error)
+}
+
+type clusterNodeService struct {
+       metaRepo  queue.Client
+       nodes     []string
+       nodeMutex sync.RWMutex
+       sync.Once
+}
+
+// NewClusterNodeRegistry creates a cluster node registry.
+func NewClusterNodeRegistry(metaRepo queue.Client) NodeRegistry {
+       nr := &clusterNodeService{
+               metaRepo: metaRepo,
+       }
+       metaRepo.Register(nr)
+       return nr
+}
+
+func (n *clusterNodeService) Locate(_, _ string, shardID uint32) (string, 
error) {
+       // Use round-robin to select the node.
+       n.nodeMutex.RLock()
+       defer n.nodeMutex.RUnlock()
+       if len(n.nodes) == 0 {
+               return "", errors.New("no node available")
+       }
+       return n.nodes[shardID%uint32(len(n.nodes))], nil
+}
+
+func (n *clusterNodeService) OnAddOrUpdate(metadata schema.Metadata) {
+       switch metadata.Kind {
+       case schema.KindNode:
+               n.nodeMutex.Lock()
+               defer n.nodeMutex.Unlock()
+               for _, node := range n.nodes {
+                       if node == 
metadata.Spec.(*databasev1.Node).Metadata.Name {
+                               return
+                       }
+               }
+               n.nodes = append(n.nodes, 
metadata.Spec.(*databasev1.Node).Metadata.Name)
+               sort.Strings(n.nodes)
+       default:
+       }
+}
+
+func (n *clusterNodeService) OnDelete(metadata schema.Metadata) {
+       switch metadata.Kind {
+       case schema.KindNode:
+               n.nodeMutex.Lock()
+               defer n.nodeMutex.Unlock()
+               for i, node := range n.nodes {
+                       if node == 
metadata.Spec.(*databasev1.Node).Metadata.Name {
+                               n.nodes = append(n.nodes[:i], n.nodes[i+1:]...)
+                               break
+                       }
+               }
+       default:
+       }
+}
+
+type localNodeService struct{}
+
+// NewLocalNodeRegistry creates a local(fake) node registry.
+func NewLocalNodeRegistry() NodeRegistry {
+       return localNodeService{}
+}
+
+// Locate of localNodeService always returns local.
+func (localNodeService) Locate(_, _ string, _ uint32) (string, error) {
+       return "local", nil
+}
diff --git a/banyand/liaison/grpc/registry_test.go 
b/banyand/liaison/grpc/registry_test.go
index 6c86d780..6dff21fa 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -178,7 +178,7 @@ func setupForRegistry() func() {
        metaSvc, err := metadata.NewService(context.TODO())
        Expect(err).NotTo(HaveOccurred())
 
-       tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc)
+       tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
        preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index cc7e0817..a89b7004 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -91,12 +91,12 @@ type server struct {
 }
 
 // NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry 
metadata.Repo) Server {
+func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry 
metadata.Repo, nodeRegistry NodeRegistry) Server {
        streamSVC := &streamService{
-               discoveryService: newDiscoveryService(pipeline, 
schema.KindStream, schemaRegistry),
+               discoveryService: newDiscoveryService(pipeline, 
schema.KindStream, schemaRegistry, nodeRegistry),
        }
        measureSVC := &measureService{
-               discoveryService: newDiscoveryService(pipeline, 
schema.KindMeasure, schemaRegistry),
+               discoveryService: newDiscoveryService(pipeline, 
schema.KindMeasure, schemaRegistry, nodeRegistry),
        }
        s := &server{
                pipeline:   pipeline,
@@ -128,7 +128,7 @@ func NewServer(_ context.Context, pipeline queue.Client, 
schemaRegistry metadata
        return s
 }
 
-func (s *server) PreRun(ctx context.Context) error {
+func (s *server) PreRun(_ context.Context) error {
        s.log = logger.GetLogger("liaison-grpc")
        s.streamSVC.setLogger(s.log)
        s.measureSVC.setLogger(s.log)
@@ -138,7 +138,7 @@ func (s *server) PreRun(ctx context.Context) error {
        }
        for _, c := range components {
                c.SetLogger(s.log)
-               if err := c.initialize(ctx); err != nil {
+               if err := c.initialize(); err != nil {
                        return err
                }
        }
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 0fbee9a4..0ae2b30d 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -115,10 +115,17 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                if s.log.Debug().Enabled() {
                        iwr.EntityValues = tagValues.Encode()
                }
-               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "TODO", iwr)
+               nodeID, errPickNode := 
s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), 
writeEntity.GetMetadata().GetName(), uint32(shardID))
+               if errPickNode != nil {
+                       s.sampled.Error().Err(errPickNode).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to pick an available node")
+                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, 
s.sampled)
+                       continue
+               }
+               message := 
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
                _, errWritePub := publisher.Publish(data.TopicStreamWrite, 
message)
                if errWritePub != nil {
-                       s.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to send a message")
+                       s.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message")
+                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, 
s.sampled)
                }
                reply(nil, modelv1.Status_STATUS_SUCCEED, 
writeEntity.GetMessageId(), stream, s.sampled)
        }
diff --git a/banyand/metadata/allocator.go b/banyand/metadata/allocator.go
deleted file mode 100644
index b85470d3..00000000
--- a/banyand/metadata/allocator.go
+++ /dev/null
@@ -1,78 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package metadata
-
-import (
-       "context"
-       "time"
-
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-var _ schema.EventHandler = (*allocator)(nil)
-
-type allocator struct {
-       schemaRegistry schema.Registry
-       l              *logger.Logger
-}
-
-func newAllocator(schemaRegistry schema.Registry, logger *logger.Logger) 
*allocator {
-       return &allocator{
-               schemaRegistry: schemaRegistry,
-               l:              logger,
-       }
-}
-
-// OnAddOrUpdate implements EventHandler.
-func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) {
-       switch metadata.Kind {
-       case schema.KindGroup:
-               groupSchema := metadata.Spec.(*commonv1.Group)
-               if groupSchema.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED {
-                       return
-               }
-               shardNum := groupSchema.GetResourceOpts().GetShardNum()
-               syncShard := func(id uint64) error {
-                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
-                       defer cancel()
-                       return a.schemaRegistry.CreateOrUpdateShard(ctx, 
&databasev1.Shard{
-                               Id:    id,
-                               Total: shardNum,
-                               Metadata: &commonv1.Metadata{
-                                       Name: 
groupSchema.GetMetadata().GetName(),
-                               },
-                               Node: "TODO",
-                       })
-               }
-               for i := 0; i < int(shardNum); i++ {
-                       _ = syncShard(uint64(i))
-               }
-       case schema.KindNode:
-               // TODO: handle node
-       default:
-               return
-       }
-}
-
-// OnDelete implements EventHandler.
-func (*allocator) OnDelete(schema.Metadata) {
-       // TODO: handle delete
-}
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index e584a123..9a166d38 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -29,7 +29,6 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -47,7 +46,6 @@ func NewClient(_ context.Context) (Service, error) {
 type clientService struct {
        namespace      string
        schemaRegistry schema.Registry
-       alc            *allocator
        closer         *run.Closer
        endpoints      []string
 }
@@ -91,7 +89,7 @@ func (s *clientService) PreRun(ctx context.Context) error {
        nodeRoles := val.([]databasev1.Role)
        ctxRegister, cancel := context.WithTimeout(ctx, time.Second*5)
        defer cancel()
-       if err = s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{
+       return s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{
                Metadata: &commonv1.Metadata{
                        Name: node.NodeID,
                },
@@ -99,12 +97,7 @@ func (s *clientService) PreRun(ctx context.Context) error {
                HttpAddress: node.HTTPAddress,
                Roles:       nodeRoles,
                CreatedAt:   timestamppb.Now(),
-       }); err != nil {
-               return err
-       }
-       s.alc = newAllocator(s.schemaRegistry, 
logger.GetLogger(s.Name()).Named("allocator"))
-       s.schemaRegistry.RegisterHandler("shard-allocator", 
schema.KindGroup|schema.KindNode, s.alc)
-       return nil
+       })
 }
 
 func (s *clientService) Serve() run.StopNotify {
@@ -149,10 +142,6 @@ func (s *clientService) PropertyRegistry() schema.Property 
{
        return s.schemaRegistry
 }
 
-func (s *clientService) ShardRegistry() schema.Shard {
-       return s.schemaRegistry
-}
-
 func (s *clientService) Name() string {
        return "metadata"
 }
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 58d10318..a5c76de5 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -46,7 +46,6 @@ type Repo interface {
        GroupRegistry() schema.Group
        TopNAggregationRegistry() schema.TopNAggregation
        PropertyRegistry() schema.Property
-       ShardRegistry() schema.Shard
        RegisterHandler(string, schema.Kind, schema.EventHandler)
 }
 
diff --git a/banyand/metadata/schema/checker.go 
b/banyand/metadata/schema/checker.go
index cdedc54b..4a336dc4 100644
--- a/banyand/metadata/schema/checker.go
+++ b/banyand/metadata/schema/checker.go
@@ -88,13 +88,6 @@ var checkerMap = map[Kind]equalityChecker{
                        protocmp.IgnoreFields(&databasev1.Node{}, "created_at"),
                        protocmp.Transform())
        },
-       KindShard: func(a, b proto.Message) bool {
-               return cmp.Equal(a, b,
-                       protocmp.IgnoreUnknown(),
-                       protocmp.IgnoreFields(&databasev1.Shard{}, 
"updated_at"),
-                       protocmp.IgnoreFields(&commonv1.Metadata{}, "id", 
"create_revision", "mod_revision"),
-                       protocmp.Transform())
-       },
        KindMask: func(a, b proto.Message) bool {
                return false
        },
diff --git a/banyand/metadata/schema/kind.go b/banyand/metadata/schema/kind.go
index 4f406f86..1a0573bb 100644
--- a/banyand/metadata/schema/kind.go
+++ b/banyand/metadata/schema/kind.go
@@ -42,10 +42,9 @@ const (
        KindTopNAggregation
        KindProperty
        KindNode
-       KindShard
        KindMask = KindGroup | KindStream | KindMeasure |
                KindIndexRuleBinding | KindIndexRule |
-               KindTopNAggregation | KindProperty | KindNode | KindShard
+               KindTopNAggregation | KindProperty | KindNode
        KindSize = 9
 )
 
@@ -67,8 +66,6 @@ func (k Kind) key() string {
                return propertyKeyPrefix
        case KindNode:
                return nodeKeyPrefix
-       case KindShard:
-               return shardKeyPrefix
        default:
                return "unknown"
        }
@@ -97,8 +94,6 @@ func (k Kind) Unmarshal(kv *mvccpb.KeyValue) (Metadata, 
error) {
                m = &databasev1.TopNAggregation{}
        case KindNode:
                m = &databasev1.Node{}
-       case KindShard:
-               m = &databasev1.Shard{}
        default:
                return Metadata{}, errUnsupportedEntityType
        }
@@ -143,8 +138,6 @@ func (k Kind) String() string {
                return "property"
        case KindNode:
                return "node"
-       case KindShard:
-               return "shard"
        default:
                return "unknown"
        }
diff --git a/banyand/metadata/schema/schema.go 
b/banyand/metadata/schema/schema.go
index 0c7095af..dd4570ca 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -54,7 +54,6 @@ type Registry interface {
        TopNAggregation
        Property
        Node
-       Shard
        RegisterHandler(string, Kind, EventHandler)
 }
 
@@ -111,11 +110,6 @@ func (m Metadata) key() (string, error) {
                }), nil
        case KindNode:
                return formatNodeKey(m.Name), nil
-       case KindShard:
-               return formatShardKey(&commonv1.Metadata{
-                       Group: m.Group,
-                       Name:  m.Name,
-               }), nil
        default:
                return "", errUnsupportedEntityType
        }
@@ -203,9 +197,3 @@ type Node interface {
        ListNode(ctx context.Context, role databasev1.Role) 
([]*databasev1.Node, error)
        RegisterNode(ctx context.Context, node *databasev1.Node) error
 }
-
-// Shard allows CRUD shard schemas in a group.
-type Shard interface {
-       CreateOrUpdateShard(ctx context.Context, shard *databasev1.Shard) error
-       ListShard(ctx context.Context, opt ListOpt) ([]*databasev1.Shard, error)
-}
diff --git a/banyand/metadata/schema/shard.go b/banyand/metadata/schema/shard.go
deleted file mode 100644
index c6499931..00000000
--- a/banyand/metadata/schema/shard.go
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package schema
-
-import (
-       "context"
-
-       "github.com/pkg/errors"
-       "google.golang.org/protobuf/types/known/timestamppb"
-
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-)
-
-var shardKeyPrefix = "/shards/"
-
-func (e *etcdSchemaRegistry) CreateOrUpdateShard(ctx context.Context, shard 
*databasev1.Shard) error {
-       if shard.UpdatedAt != nil {
-               shard.UpdatedAt = timestamppb.Now()
-       }
-       md := Metadata{
-               TypeMeta: TypeMeta{
-                       Kind:  KindShard,
-                       Group: shard.GetMetadata().GetGroup(),
-                       Name:  shard.GetMetadata().GetName(),
-               },
-               Spec: shard,
-       }
-       _, err := e.update(ctx, md)
-       if err == nil {
-               return nil
-       }
-       if errors.Is(err, ErrGRPCResourceNotFound) {
-               shard.CreatedAt = shard.UpdatedAt
-               md.Spec = shard
-               _, err = e.create(ctx, md)
-               return err
-       }
-       return err
-}
-
-func (e *etcdSchemaRegistry) ListShard(ctx context.Context, opt ListOpt) 
([]*databasev1.Shard, error) {
-       if opt.Group == "" {
-               return nil, BadRequest("group", "group should not be empty")
-       }
-       messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, 
shardKeyPrefix), KindShard)
-       if err != nil {
-               return nil, err
-       }
-       entities := make([]*databasev1.Shard, 0, len(messages))
-       for _, message := range messages {
-               entities = append(entities, message.(*databasev1.Shard))
-       }
-       return entities, nil
-}
-
-func formatShardKey(metadata *commonv1.Metadata) string {
-       return formatKey(shardKeyPrefix, metadata)
-}
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index c5b8dc42..a6872881 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -19,6 +19,7 @@
 package queue
 
 import (
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -84,6 +85,9 @@ func (*local) GetPort() *uint32 {
        return nil
 }
 
+func (*local) Register(schema.EventHandler) {
+}
+
 type localBatchPublisher struct {
        local *bus.Bus
 }
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index c5c68139..aa9dbbfe 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -88,6 +88,9 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
        }
        c := clusterv1.NewServiceClient(conn)
        p.clients[name] = &client{conn: conn, client: c}
+       if p.handler != nil {
+               p.handler.OnAddOrUpdate(md)
+       }
 }
 
 func (p *pub) OnDelete(md schema.Metadata) {
@@ -112,5 +115,8 @@ func (p *pub) OnDelete(md schema.Metadata) {
                        client.conn.Close() // Close the client connection
                }
                delete(p.clients, name)
+               if p.handler != nil {
+                       p.handler.OnDelete(md)
+               }
        }
 }
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index b72e9a07..9e00b13a 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -46,13 +46,17 @@ var (
 
 type pub struct {
        metadata metadata.Repo
+       handler  schema.EventHandler
        log      *logger.Logger
        clients  map[string]*client
        closer   *run.Closer
        mu       sync.RWMutex
 }
 
-// GracefulStop implements run.Service.
+func (p *pub) Register(handler schema.EventHandler) {
+       p.handler = handler
+}
+
 func (p *pub) GracefulStop() {
        p.closer.Done()
        p.closer.CloseThenWait()
@@ -133,12 +137,10 @@ func New(metadata metadata.Repo) queue.Client {
        }
 }
 
-// Name implements run.PreRunner.
 func (*pub) Name() string {
        return "queue-client"
 }
 
-// PreRun implements run.PreRunner.
 func (p *pub) PreRun(context.Context) error {
        p.log = logger.GetLogger("server-queue")
        p.metadata.RegisterHandler("queue-client", schema.KindNode, p)
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 7bb8c531..d518d331 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -20,6 +20,7 @@ package queue
 import (
        "io"
 
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -39,6 +40,7 @@ type Client interface {
        bus.Publisher
        bus.Broadcaster
        NewBatchPublisher() BatchPublisher
+       Register(schema.EventHandler)
 }
 
 // Server is the interface for receiving data from the queue.
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 3ef2eb44..e1178ef3 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -43,7 +43,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
        pipeline := pub.New(metaSvc)
-       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc)
+       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, 
grpc.NewClusterNodeRegistry(pipeline))
        profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService()
        httpServer := http.NewServer()
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index f0dca2f2..3c659b09 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -57,7 +57,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
-       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc)
+       grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
        profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService()
        httpServer := http.NewServer()

Reply via email to