This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit fe2687b975db70eeb7010e33b81ba00045c61829 Author: Gao Hongtao <[email protected]> AuthorDate: Mon Sep 18 12:49:48 2023 +0000 Refactor node selecting process Signed-off-by: Gao Hongtao <[email protected]> --- banyand/liaison/grpc/discovery.go | 103 ++++++-------------------------- banyand/liaison/grpc/measure.go | 13 +++- banyand/liaison/grpc/node.go | 109 ++++++++++++++++++++++++++++++++++ banyand/liaison/grpc/registry_test.go | 2 +- banyand/liaison/grpc/server.go | 10 ++-- banyand/liaison/grpc/stream.go | 11 +++- banyand/metadata/allocator.go | 78 ------------------------ banyand/metadata/client.go | 15 +---- banyand/metadata/metadata.go | 1 - banyand/metadata/schema/checker.go | 7 --- banyand/metadata/schema/kind.go | 9 +-- banyand/metadata/schema/schema.go | 12 ---- banyand/metadata/schema/shard.go | 74 ----------------------- banyand/queue/local.go | 4 ++ banyand/queue/pub/client.go | 6 ++ banyand/queue/pub/pub.go | 8 ++- banyand/queue/queue.go | 2 + pkg/cmdsetup/liaison.go | 2 +- pkg/cmdsetup/standalone.go | 2 +- 19 files changed, 175 insertions(+), 293 deletions(-) diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index 823183b1..b133d21d 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -18,10 +18,8 @@ package grpc import ( - "context" "fmt" "sync" - "time" "github.com/pkg/errors" @@ -42,13 +40,14 @@ var errNotExist = errors.New("the object doesn't exist") type discoveryService struct { pipeline queue.Client metadataRepo metadata.Repo + nodeRegistry NodeRegistry shardRepo *shardRepo entityRepo *entityRepo log *logger.Logger kind schema.Kind } -func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo metadata.Repo) *discoveryService { +func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo metadata.Repo, nodeRegistry NodeRegistry) *discoveryService { sr := &shardRepo{shardEventsMap: make(map[identity]uint32)} er := &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)} return &discoveryService{ @@ -57,80 +56,12 @@ func newDiscoveryService(pipeline queue.Client, kind schema.Kind, metadataRepo m pipeline: pipeline, kind: kind, metadataRepo: metadataRepo, + nodeRegistry: nodeRegistry, } } -func (ds *discoveryService) initialize(ctx context.Context) error { - ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) - groups, err := ds.metadataRepo.GroupRegistry().ListGroup(ctxLocal) - cancel() - if err != nil { - return err - } - for _, g := range groups { - switch ds.kind { - case schema.KindMeasure: - case schema.KindStream: - default: - continue - } - ctxLocal, cancel := context.WithTimeout(ctx, 5*time.Second) - shards, innerErr := ds.metadataRepo.ShardRegistry().ListShard(ctxLocal, schema.ListOpt{Group: g.Metadata.Name}) - cancel() - if innerErr != nil { - return innerErr - } - for _, s := range shards { - ds.shardRepo.OnAddOrUpdate(schema.Metadata{ - TypeMeta: schema.TypeMeta{ - Kind: schema.KindShard, - Name: s.Metadata.Name, - Group: s.Metadata.Group, - }, - Spec: s, - }) - } - - switch ds.kind { - case schema.KindMeasure: - ctxLocal, cancel = context.WithTimeout(ctx, 5*time.Second) - mm, innerErr := ds.metadataRepo.MeasureRegistry().ListMeasure(ctxLocal, schema.ListOpt{Group: g.Metadata.Name}) - cancel() - if innerErr != nil { - return innerErr - } - for _, m := range mm { - ds.entityRepo.OnAddOrUpdate(schema.Metadata{ - TypeMeta: schema.TypeMeta{ - Kind: schema.KindMeasure, - Name: m.Metadata.Name, - Group: m.Metadata.Group, - }, - Spec: m, - }) - } - case schema.KindStream: - ctxLocal, cancel = context.WithTimeout(ctx, 5*time.Second) - ss, innerErr := ds.metadataRepo.StreamRegistry().ListStream(ctxLocal, schema.ListOpt{Group: g.Metadata.Name}) - cancel() - if innerErr != nil { - return innerErr - } - for _, s := range ss { - ds.entityRepo.OnAddOrUpdate(schema.Metadata{ - TypeMeta: schema.TypeMeta{ - Kind: schema.KindStream, - Name: s.Metadata.Name, - Group: s.Metadata.Group, - }, - Spec: s, - }) - } - default: - return fmt.Errorf("unsupported kind: %d", ds.kind) - } - } - ds.metadataRepo.RegisterHandler("liaison", schema.KindShard, ds.shardRepo) +func (ds *discoveryService) initialize() error { + ds.metadataRepo.RegisterHandler("liaison", schema.KindGroup, ds.shardRepo) ds.metadataRepo.RegisterHandler("liaison", ds.kind, ds.entityRepo) return nil } @@ -172,28 +103,32 @@ type shardRepo struct { sync.RWMutex } -// OnAddOrUpdate implements schema.EventHandler. func (s *shardRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { - if schemaMetadata.Kind != schema.KindShard { + if schemaMetadata.Kind != schema.KindGroup { return } - shard := schemaMetadata.Spec.(*databasev1.Shard) - idx := getID(shard.GetMetadata()) + group := schemaMetadata.Spec.(*commonv1.Group) + if group.ResourceOpts == nil || group.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED { + return + } + idx := getID(group.GetMetadata()) if le := s.log.Debug(); le.Enabled() { - le.Stringer("id", idx).Uint32("total", shard.Total).Msg("shard added or updated") + le.Stringer("id", idx).Uint32("total", group.ResourceOpts.ShardNum).Msg("shard added or updated") } s.RWMutex.Lock() defer s.RWMutex.Unlock() - s.shardEventsMap[idx] = shard.Total + s.shardEventsMap[idx] = group.ResourceOpts.ShardNum } -// OnDelete implements schema.EventHandler. func (s *shardRepo) OnDelete(schemaMetadata schema.Metadata) { - if schemaMetadata.Kind != schema.KindShard { + if schemaMetadata.Kind != schema.KindGroup { + return + } + group := schemaMetadata.Spec.(*commonv1.Group) + if group.ResourceOpts == nil || group.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED { return } - shard := schemaMetadata.Spec.(*databasev1.Shard) - idx := getID(shard.GetMetadata()) + idx := getID(group.GetMetadata()) if le := s.log.Debug(); le.Enabled() { le.Stringer("id", idx).Msg("shard deleted") } diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 90cf7632..4b3f7da9 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -113,11 +113,18 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er SeriesHash: tsdb.HashEntity(entity), EntityValues: tagValues.Encode(), } - // TODO: set node id - message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "todo", iwr) + nodeID, errPickNode := ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(), writeRequest.GetMetadata().GetName(), uint32(shardID)) + if errPickNode != nil { + ms.sampled.Error().Err(errPickNode).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to pick an available node") + reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled) + continue + } + message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) _, errWritePub := publisher.Publish(data.TopicMeasureWrite, message) if errWritePub != nil { - ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Msg("failed to send a message") + ms.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a message") + reply(writeRequest.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, ms.sampled) + continue } reply(nil, modelv1.Status_STATUS_SUCCEED, writeRequest.GetMessageId(), measure, ms.sampled) } diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go new file mode 100644 index 00000000..73529604 --- /dev/null +++ b/banyand/liaison/grpc/node.go @@ -0,0 +1,109 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "sort" + "sync" + + "github.com/pkg/errors" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/queue" +) + +var ( + _ schema.EventHandler = (*clusterNodeService)(nil) + _ NodeRegistry = (*clusterNodeService)(nil) +) + +// NodeRegistry is for locating data node with group/name of the metadata +// together with the shardID calculated from the incoming data. +type NodeRegistry interface { + Locate(group, name string, shardID uint32) (string, error) +} + +type clusterNodeService struct { + metaRepo queue.Client + nodes []string + nodeMutex sync.RWMutex + sync.Once +} + +// NewClusterNodeRegistry creates a cluster node registry. +func NewClusterNodeRegistry(metaRepo queue.Client) NodeRegistry { + nr := &clusterNodeService{ + metaRepo: metaRepo, + } + metaRepo.Register(nr) + return nr +} + +func (n *clusterNodeService) Locate(_, _ string, shardID uint32) (string, error) { + // Use round-robin to select the node. + n.nodeMutex.RLock() + defer n.nodeMutex.RUnlock() + if len(n.nodes) == 0 { + return "", errors.New("no node available") + } + return n.nodes[shardID%uint32(len(n.nodes))], nil +} + +func (n *clusterNodeService) OnAddOrUpdate(metadata schema.Metadata) { + switch metadata.Kind { + case schema.KindNode: + n.nodeMutex.Lock() + defer n.nodeMutex.Unlock() + for _, node := range n.nodes { + if node == metadata.Spec.(*databasev1.Node).Metadata.Name { + return + } + } + n.nodes = append(n.nodes, metadata.Spec.(*databasev1.Node).Metadata.Name) + sort.Strings(n.nodes) + default: + } +} + +func (n *clusterNodeService) OnDelete(metadata schema.Metadata) { + switch metadata.Kind { + case schema.KindNode: + n.nodeMutex.Lock() + defer n.nodeMutex.Unlock() + for i, node := range n.nodes { + if node == metadata.Spec.(*databasev1.Node).Metadata.Name { + n.nodes = append(n.nodes[:i], n.nodes[i+1:]...) + break + } + } + default: + } +} + +type localNodeService struct{} + +// NewLocalNodeRegistry creates a local(fake) node registry. +func NewLocalNodeRegistry() NodeRegistry { + return localNodeService{} +} + +// Locate of localNodeService always returns local. +func (localNodeService) Locate(_, _ string, _ uint32) (string, error) { + return "local", nil +} diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go index 6c86d780..6dff21fa 100644 --- a/banyand/liaison/grpc/registry_test.go +++ b/banyand/liaison/grpc/registry_test.go @@ -178,7 +178,7 @@ func setupForRegistry() func() { metaSvc, err := metadata.NewService(context.TODO()) Expect(err).NotTo(HaveOccurred()) - tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc) + tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc, grpc.NewLocalNodeRegistry()) preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc} var flags []string metaPath, metaDeferFunc, err := test.NewSpace() diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index cc7e0817..a89b7004 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -91,12 +91,12 @@ type server struct { } // NewServer returns a new gRPC server. -func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry metadata.Repo) Server { +func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry metadata.Repo, nodeRegistry NodeRegistry) Server { streamSVC := &streamService{ - discoveryService: newDiscoveryService(pipeline, schema.KindStream, schemaRegistry), + discoveryService: newDiscoveryService(pipeline, schema.KindStream, schemaRegistry, nodeRegistry), } measureSVC := &measureService{ - discoveryService: newDiscoveryService(pipeline, schema.KindMeasure, schemaRegistry), + discoveryService: newDiscoveryService(pipeline, schema.KindMeasure, schemaRegistry, nodeRegistry), } s := &server{ pipeline: pipeline, @@ -128,7 +128,7 @@ func NewServer(_ context.Context, pipeline queue.Client, schemaRegistry metadata return s } -func (s *server) PreRun(ctx context.Context) error { +func (s *server) PreRun(_ context.Context) error { s.log = logger.GetLogger("liaison-grpc") s.streamSVC.setLogger(s.log) s.measureSVC.setLogger(s.log) @@ -138,7 +138,7 @@ func (s *server) PreRun(ctx context.Context) error { } for _, c := range components { c.SetLogger(s.log) - if err := c.initialize(ctx); err != nil { + if err := c.initialize(); err != nil { return err } } diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 0fbee9a4..0ae2b30d 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -115,10 +115,17 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { if s.log.Debug().Enabled() { iwr.EntityValues = tagValues.Encode() } - message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), "TODO", iwr) + nodeID, errPickNode := s.nodeRegistry.Locate(writeEntity.GetMetadata().GetGroup(), writeEntity.GetMetadata().GetName(), uint32(shardID)) + if errPickNode != nil { + s.sampled.Error().Err(errPickNode).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to pick an available node") + reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled) + continue + } + message := bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr) _, errWritePub := publisher.Publish(data.TopicStreamWrite, message) if errWritePub != nil { - s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Msg("failed to send a message") + s.sampled.Error().Err(errWritePub).RawJSON("written", logger.Proto(writeEntity)).Str("nodeID", nodeID).Msg("failed to send a message") + reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, s.sampled) } reply(nil, modelv1.Status_STATUS_SUCCEED, writeEntity.GetMessageId(), stream, s.sampled) } diff --git a/banyand/metadata/allocator.go b/banyand/metadata/allocator.go deleted file mode 100644 index b85470d3..00000000 --- a/banyand/metadata/allocator.go +++ /dev/null @@ -1,78 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package metadata - -import ( - "context" - "time" - - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - "github.com/apache/skywalking-banyandb/banyand/metadata/schema" - "github.com/apache/skywalking-banyandb/pkg/logger" -) - -var _ schema.EventHandler = (*allocator)(nil) - -type allocator struct { - schemaRegistry schema.Registry - l *logger.Logger -} - -func newAllocator(schemaRegistry schema.Registry, logger *logger.Logger) *allocator { - return &allocator{ - schemaRegistry: schemaRegistry, - l: logger, - } -} - -// OnAddOrUpdate implements EventHandler. -func (a *allocator) OnAddOrUpdate(metadata schema.Metadata) { - switch metadata.Kind { - case schema.KindGroup: - groupSchema := metadata.Spec.(*commonv1.Group) - if groupSchema.Catalog == commonv1.Catalog_CATALOG_UNSPECIFIED { - return - } - shardNum := groupSchema.GetResourceOpts().GetShardNum() - syncShard := func(id uint64) error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - return a.schemaRegistry.CreateOrUpdateShard(ctx, &databasev1.Shard{ - Id: id, - Total: shardNum, - Metadata: &commonv1.Metadata{ - Name: groupSchema.GetMetadata().GetName(), - }, - Node: "TODO", - }) - } - for i := 0; i < int(shardNum); i++ { - _ = syncShard(uint64(i)) - } - case schema.KindNode: - // TODO: handle node - default: - return - } -} - -// OnDelete implements EventHandler. -func (*allocator) OnDelete(schema.Metadata) { - // TODO: handle delete -} diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index e584a123..9a166d38 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -29,7 +29,6 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" - "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -47,7 +46,6 @@ func NewClient(_ context.Context) (Service, error) { type clientService struct { namespace string schemaRegistry schema.Registry - alc *allocator closer *run.Closer endpoints []string } @@ -91,7 +89,7 @@ func (s *clientService) PreRun(ctx context.Context) error { nodeRoles := val.([]databasev1.Role) ctxRegister, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() - if err = s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{ + return s.schemaRegistry.RegisterNode(ctxRegister, &databasev1.Node{ Metadata: &commonv1.Metadata{ Name: node.NodeID, }, @@ -99,12 +97,7 @@ func (s *clientService) PreRun(ctx context.Context) error { HttpAddress: node.HTTPAddress, Roles: nodeRoles, CreatedAt: timestamppb.Now(), - }); err != nil { - return err - } - s.alc = newAllocator(s.schemaRegistry, logger.GetLogger(s.Name()).Named("allocator")) - s.schemaRegistry.RegisterHandler("shard-allocator", schema.KindGroup|schema.KindNode, s.alc) - return nil + }) } func (s *clientService) Serve() run.StopNotify { @@ -149,10 +142,6 @@ func (s *clientService) PropertyRegistry() schema.Property { return s.schemaRegistry } -func (s *clientService) ShardRegistry() schema.Shard { - return s.schemaRegistry -} - func (s *clientService) Name() string { return "metadata" } diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go index 58d10318..a5c76de5 100644 --- a/banyand/metadata/metadata.go +++ b/banyand/metadata/metadata.go @@ -46,7 +46,6 @@ type Repo interface { GroupRegistry() schema.Group TopNAggregationRegistry() schema.TopNAggregation PropertyRegistry() schema.Property - ShardRegistry() schema.Shard RegisterHandler(string, schema.Kind, schema.EventHandler) } diff --git a/banyand/metadata/schema/checker.go b/banyand/metadata/schema/checker.go index cdedc54b..4a336dc4 100644 --- a/banyand/metadata/schema/checker.go +++ b/banyand/metadata/schema/checker.go @@ -88,13 +88,6 @@ var checkerMap = map[Kind]equalityChecker{ protocmp.IgnoreFields(&databasev1.Node{}, "created_at"), protocmp.Transform()) }, - KindShard: func(a, b proto.Message) bool { - return cmp.Equal(a, b, - protocmp.IgnoreUnknown(), - protocmp.IgnoreFields(&databasev1.Shard{}, "updated_at"), - protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"), - protocmp.Transform()) - }, KindMask: func(a, b proto.Message) bool { return false }, diff --git a/banyand/metadata/schema/kind.go b/banyand/metadata/schema/kind.go index 4f406f86..1a0573bb 100644 --- a/banyand/metadata/schema/kind.go +++ b/banyand/metadata/schema/kind.go @@ -42,10 +42,9 @@ const ( KindTopNAggregation KindProperty KindNode - KindShard KindMask = KindGroup | KindStream | KindMeasure | KindIndexRuleBinding | KindIndexRule | - KindTopNAggregation | KindProperty | KindNode | KindShard + KindTopNAggregation | KindProperty | KindNode KindSize = 9 ) @@ -67,8 +66,6 @@ func (k Kind) key() string { return propertyKeyPrefix case KindNode: return nodeKeyPrefix - case KindShard: - return shardKeyPrefix default: return "unknown" } @@ -97,8 +94,6 @@ func (k Kind) Unmarshal(kv *mvccpb.KeyValue) (Metadata, error) { m = &databasev1.TopNAggregation{} case KindNode: m = &databasev1.Node{} - case KindShard: - m = &databasev1.Shard{} default: return Metadata{}, errUnsupportedEntityType } @@ -143,8 +138,6 @@ func (k Kind) String() string { return "property" case KindNode: return "node" - case KindShard: - return "shard" default: return "unknown" } diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go index 0c7095af..dd4570ca 100644 --- a/banyand/metadata/schema/schema.go +++ b/banyand/metadata/schema/schema.go @@ -54,7 +54,6 @@ type Registry interface { TopNAggregation Property Node - Shard RegisterHandler(string, Kind, EventHandler) } @@ -111,11 +110,6 @@ func (m Metadata) key() (string, error) { }), nil case KindNode: return formatNodeKey(m.Name), nil - case KindShard: - return formatShardKey(&commonv1.Metadata{ - Group: m.Group, - Name: m.Name, - }), nil default: return "", errUnsupportedEntityType } @@ -203,9 +197,3 @@ type Node interface { ListNode(ctx context.Context, role databasev1.Role) ([]*databasev1.Node, error) RegisterNode(ctx context.Context, node *databasev1.Node) error } - -// Shard allows CRUD shard schemas in a group. -type Shard interface { - CreateOrUpdateShard(ctx context.Context, shard *databasev1.Shard) error - ListShard(ctx context.Context, opt ListOpt) ([]*databasev1.Shard, error) -} diff --git a/banyand/metadata/schema/shard.go b/banyand/metadata/schema/shard.go deleted file mode 100644 index c6499931..00000000 --- a/banyand/metadata/schema/shard.go +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package schema - -import ( - "context" - - "github.com/pkg/errors" - "google.golang.org/protobuf/types/known/timestamppb" - - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" - databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" -) - -var shardKeyPrefix = "/shards/" - -func (e *etcdSchemaRegistry) CreateOrUpdateShard(ctx context.Context, shard *databasev1.Shard) error { - if shard.UpdatedAt != nil { - shard.UpdatedAt = timestamppb.Now() - } - md := Metadata{ - TypeMeta: TypeMeta{ - Kind: KindShard, - Group: shard.GetMetadata().GetGroup(), - Name: shard.GetMetadata().GetName(), - }, - Spec: shard, - } - _, err := e.update(ctx, md) - if err == nil { - return nil - } - if errors.Is(err, ErrGRPCResourceNotFound) { - shard.CreatedAt = shard.UpdatedAt - md.Spec = shard - _, err = e.create(ctx, md) - return err - } - return err -} - -func (e *etcdSchemaRegistry) ListShard(ctx context.Context, opt ListOpt) ([]*databasev1.Shard, error) { - if opt.Group == "" { - return nil, BadRequest("group", "group should not be empty") - } - messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, shardKeyPrefix), KindShard) - if err != nil { - return nil, err - } - entities := make([]*databasev1.Shard, 0, len(messages)) - for _, message := range messages { - entities = append(entities, message.(*databasev1.Shard)) - } - return entities, nil -} - -func formatShardKey(metadata *commonv1.Metadata) string { - return formatKey(shardKeyPrefix, metadata) -} diff --git a/banyand/queue/local.go b/banyand/queue/local.go index c5b8dc42..a6872881 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -19,6 +19,7 @@ package queue import ( + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -84,6 +85,9 @@ func (*local) GetPort() *uint32 { return nil } +func (*local) Register(schema.EventHandler) { +} + type localBatchPublisher struct { local *bus.Bus } diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index c5c68139..aa9dbbfe 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -88,6 +88,9 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { } c := clusterv1.NewServiceClient(conn) p.clients[name] = &client{conn: conn, client: c} + if p.handler != nil { + p.handler.OnAddOrUpdate(md) + } } func (p *pub) OnDelete(md schema.Metadata) { @@ -112,5 +115,8 @@ func (p *pub) OnDelete(md schema.Metadata) { client.conn.Close() // Close the client connection } delete(p.clients, name) + if p.handler != nil { + p.handler.OnDelete(md) + } } } diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index b72e9a07..9e00b13a 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -46,13 +46,17 @@ var ( type pub struct { metadata metadata.Repo + handler schema.EventHandler log *logger.Logger clients map[string]*client closer *run.Closer mu sync.RWMutex } -// GracefulStop implements run.Service. +func (p *pub) Register(handler schema.EventHandler) { + p.handler = handler +} + func (p *pub) GracefulStop() { p.closer.Done() p.closer.CloseThenWait() @@ -133,12 +137,10 @@ func New(metadata metadata.Repo) queue.Client { } } -// Name implements run.PreRunner. func (*pub) Name() string { return "queue-client" } -// PreRun implements run.PreRunner. func (p *pub) PreRun(context.Context) error { p.log = logger.GetLogger("server-queue") p.metadata.RegisterHandler("queue-client", schema.KindNode, p) diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go index 7bb8c531..d518d331 100644 --- a/banyand/queue/queue.go +++ b/banyand/queue/queue.go @@ -20,6 +20,7 @@ package queue import ( "io" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -39,6 +40,7 @@ type Client interface { bus.Publisher bus.Broadcaster NewBatchPublisher() BatchPublisher + Register(schema.EventHandler) } // Server is the interface for receiving data from the queue. diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go index 3ef2eb44..e1178ef3 100644 --- a/pkg/cmdsetup/liaison.go +++ b/pkg/cmdsetup/liaison.go @@ -43,7 +43,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command { l.Fatal().Err(err).Msg("failed to initiate metadata service") } pipeline := pub.New(metaSvc) - grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, grpc.NewClusterNodeRegistry(pipeline)) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() httpServer := http.NewServer() diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go index f0dca2f2..3c659b09 100644 --- a/pkg/cmdsetup/standalone.go +++ b/pkg/cmdsetup/standalone.go @@ -57,7 +57,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } - grpcServer := grpc.NewServer(ctx, pipeline, metaSvc) + grpcServer := grpc.NewServer(ctx, pipeline, metaSvc, grpc.NewLocalNodeRegistry()) profSvc := observability.NewProfService() metricSvc := observability.NewMetricService() httpServer := http.NewServer()
