This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch liaison-topology in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 841ca0c8cf39c82028ded9d5ea42050c897511d5 Author: Megrez Lu <[email protected]> AuthorDate: Thu Aug 31 18:41:19 2023 +0800 add discovery module --- banyand/discovery/service.go | 77 ++++++++++++++++++++++++++++++++++++++++++++ pkg/discovery/register.go | 67 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) diff --git a/banyand/discovery/service.go b/banyand/discovery/service.go new file mode 100644 index 00000000..385550d2 --- /dev/null +++ b/banyand/discovery/service.go @@ -0,0 +1,77 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package discovery provides discovery service for a BanyanDB cluster +package discovery + +import ( + "context" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +var ( + _ Service = (*service)(nil) +) + +// Service is the metadata repository. +type Service interface { + run.PreRunner + run.Service + run.Config +} + +type service struct { + nodeRole string + nodeID string + closer *run.Closer +} + +func (s *service) Name() string { + return "discovery" +} + +func (s *service) PreRun(ctx context.Context) error { + if nodeRole, ok := ctx.Value(common.ContextNodeRolesKey).(string); ok { + s.nodeRole = nodeRole + } + + if nodeID, ok := ctx.Value(common.ContextNodeKey).(string); ok { + s.nodeID = nodeID + } + + return nil +} + +func (s *service) Serve() run.StopNotify { + return s.closer.CloseNotify() +} + +func (s *service) GracefulStop() { + //TODO implement me + panic("implement me") +} + +func (s *service) FlagSet() *run.FlagSet { + //TODO implement me + panic("implement me") +} + +func (s *service) Validate() error { + return nil +} diff --git a/pkg/discovery/register.go b/pkg/discovery/register.go new file mode 100644 index 00000000..1279e899 --- /dev/null +++ b/pkg/discovery/register.go @@ -0,0 +1,67 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package discovery provides APIs for membership registration and discovery +package discovery + +import ( + "context" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +type ServiceRegister struct { + client *clientv3.Client + closer *run.Closer + l *logger.Logger + key string + value string + ttl int64 + leaseID clientv3.LeaseID + keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse +} + +func (sr *ServiceRegister) RegisterSelf() error { + resp, err := sr.client.Grant(context.Background(), sr.ttl) + if err != nil { + return err + } + + _, err = sr.client.Put(context.Background(), sr.key, sr.value, clientv3.WithLease(resp.ID)) + if err != nil { + return err + } + + leaseRespChan, err := sr.client.KeepAlive(context.Background(), resp.ID) + if err != nil { + return err + } + sr.leaseID = resp.ID + sr.keepAliveChan = leaseRespChan + + return nil +} + +func (sr *ServiceRegister) Close() error { + if _, err := sr.client.Revoke(context.Background(), sr.leaseID); err != nil { + return err + } + return nil +}
