This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch startup
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit d57d1dca5d3e0203970dec72cca304f67b37f88f
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 19 05:03:12 2024 +0000

    Reconnect to the etcd in the startup phase
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                 |  1 +
 banyand/metadata/client.go | 76 +++++++++++++++++++++++++++++++++++++---------
 scripts/push-release.sh    |  1 +
 3 files changed, 64 insertions(+), 14 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 20cd8b38..22c7ee35 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -42,6 +42,7 @@ Release Notes.
 - Fix several "sync.Pool" leak issues by adding a tracker to the pool.
 - Fix panic when removing a expired segment.
 - Fix panic when reading a disorder block of measure. This block's versions 
are not sorted in descending order.
+- Fix the bug that the etcd client doesn't reconnect when facing the context 
timeout in the startup phase.
 
 ### Documentation
 
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 61ba8164..94a7f1bd 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -19,6 +19,9 @@ package metadata
 
 import (
        "context"
+       "os"
+       "os/signal"
+       "syscall"
        "time"
 
        "github.com/pkg/errors"
@@ -68,6 +71,7 @@ type clientService struct {
        etcdTLSCertFile   string
        etcdTLSKeyFile    string
        endpoints         []string
+       registryTimeout   time.Duration
        forceRegisterNode bool
 }
 
@@ -84,6 +88,7 @@ func (s *clientService) FlagSet() *run.FlagSet {
        fs.StringVar(&s.etcdTLSCAFile, flagEtcdTLSCAFile, "", "Trusted 
certificate authority")
        fs.StringVar(&s.etcdTLSCertFile, flagEtcdTLSCertFile, "", "Etcd client 
certificate")
        fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key 
for the etcd client certificate.")
+       fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 
2*time.Minute, "The timeout for the node registry")
        return fs
 }
 
@@ -95,17 +100,50 @@ func (s *clientService) Validate() error {
 }
 
 func (s *clientService) PreRun(ctx context.Context) error {
-       var err error
-       s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
-               schema.Namespace(s.namespace),
-               schema.ConfigureServerEndpoints(s.endpoints),
-               schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword),
-               schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
-               schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, 
s.etcdTLSKeyFile),
-       )
-       if err != nil {
+       stopCh := make(chan struct{})
+       sn := make(chan os.Signal, 1)
+       l := logger.GetLogger(s.Name())
+       signal.Notify(sn,
+               syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, 
syscall.SIGTERM)
+       go func() {
+               select {
+               case si := <-sn:
+                       logger.GetLogger(s.Name()).Info().Msgf("signal 
received: %s", si)
+                       close(stopCh)
+               case <-s.closer.CloseNotify():
+                       close(stopCh)
+               }
+       }()
+
+       for {
+               var err error
+               s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
+                       schema.Namespace(s.namespace),
+                       schema.ConfigureServerEndpoints(s.endpoints),
+                       schema.ConfigureEtcdUser(s.etcdUsername, 
s.etcdPassword),
+                       schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
+                       schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, 
s.etcdTLSKeyFile),
+               )
+               if errors.Is(err, context.DeadlineExceeded) {
+                       select {
+                       case <-stopCh:
+                               return errors.New("pre-run interrupted")
+                       case <-time.After(s.registryTimeout):
+                               return errors.New("pre-run timeout")
+                       case <-s.closer.CloseNotify():
+                               return errors.New("pre-run interrupted")
+                       default:
+                               l.Warn().Strs("etcd-endpoints", 
s.endpoints).Msg("the schema registry init timeout, retrying...")
+                               time.Sleep(time.Second)
+                               continue
+                       }
+               }
+               if err == nil {
+                       break
+               }
                return err
        }
+
        val := ctx.Value(common.ContextNodeKey)
        if val == nil {
                return errors.New("node id is empty")
@@ -116,7 +154,6 @@ func (s *clientService) PreRun(ctx context.Context) error {
                return errors.New("node roles is empty")
        }
        nodeRoles := val.([]databasev1.Role)
-       l := logger.GetLogger(s.Name())
        nodeInfo := &databasev1.Node{
                Metadata: &commonv1.Metadata{
                        Name: node.NodeID,
@@ -126,15 +163,26 @@ func (s *clientService) PreRun(ctx context.Context) error 
{
                Roles:       nodeRoles,
                CreatedAt:   timestamppb.Now(),
        }
+       var cancel context.CancelFunc
        for {
-               ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10)
-               err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo, 
s.forceRegisterNode)
+               ctx, cancel = context.WithTimeout(ctx, time.Second*10)
+               err := s.schemaRegistry.RegisterNode(ctx, nodeInfo, 
s.forceRegisterNode)
                cancel()
                if errors.Is(err, schema.ErrGRPCAlreadyExists) {
                        return errors.Wrapf(err, "node[%s] already exists in 
etcd", node.NodeID)
                } else if errors.Is(err, context.DeadlineExceeded) {
-                       l.Warn().Strs("etcd-endpoints", 
s.endpoints).Msg("register node timeout, retrying...")
-                       continue
+                       select {
+                       case <-stopCh:
+                               return errors.New("register node interrupted")
+                       case <-time.After(s.registryTimeout):
+                               return errors.New("register node timeout")
+                       case <-s.closer.CloseNotify():
+                               return errors.New("register node interrupted")
+                       default:
+                               l.Warn().Strs("etcd-endpoints", 
s.endpoints).Msg("register node timeout, retrying...")
+                               time.Sleep(time.Second)
+                               continue
+                       }
                }
                if err == nil {
                        l.Info().Stringer("info", nodeInfo).Msg("register node 
successfully")
diff --git a/scripts/push-release.sh b/scripts/push-release.sh
index 0a0ed328..4c41f32e 100755
--- a/scripts/push-release.sh
+++ b/scripts/push-release.sh
@@ -45,6 +45,7 @@ cp ${PRODUCT_NAME}-*.tgz.asc skywalking/banyandb/"$VERSION"
 cp ${PRODUCT_NAME}-*.tgz.sha512 skywalking/banyandb/"$VERSION"
 
 cd skywalking/banyandb && svn add "$VERSION" && svn commit -m "Draft Apache 
SkyWalking BanyanDB release $VERSION"
+cd "$VERSION"
 
 cat << EOF
 =========================================================================

Reply via email to