This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new b7d397b2 Reconnect to the etcd in the startup phase (#538)
b7d397b2 is described below
commit b7d397b2bd51c04509afabbd9a744ba6beaa218c
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 19 14:24:47 2024 +0800
Reconnect to the etcd in the startup phase (#538)
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
banyand/metadata/client.go | 76 +++++++++++++++++++++++++++++++++++++---------
scripts/push-release.sh | 1 +
3 files changed, 64 insertions(+), 14 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 20cd8b38..22c7ee35 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -42,6 +42,7 @@ Release Notes.
- Fix several "sync.Pool" leak issues by adding a tracker to the pool.
- Fix panic when removing a expired segment.
- Fix panic when reading a disorder block of measure. This block's versions
are not sorted in descending order.
+- Fix the bug that the etcd client doesn't reconnect when facing the context
timeout in the startup phase.
### Documentation
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 61ba8164..94a7f1bd 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -19,6 +19,9 @@ package metadata
import (
"context"
+ "os"
+ "os/signal"
+ "syscall"
"time"
"github.com/pkg/errors"
@@ -68,6 +71,7 @@ type clientService struct {
etcdTLSCertFile string
etcdTLSKeyFile string
endpoints []string
+ registryTimeout time.Duration
forceRegisterNode bool
}
@@ -84,6 +88,7 @@ func (s *clientService) FlagSet() *run.FlagSet {
fs.StringVar(&s.etcdTLSCAFile, flagEtcdTLSCAFile, "", "Trusted
certificate authority")
fs.StringVar(&s.etcdTLSCertFile, flagEtcdTLSCertFile, "", "Etcd client
certificate")
fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key
for the etcd client certificate.")
+ fs.DurationVar(&s.registryTimeout, "node-registry-timeout",
2*time.Minute, "The timeout for the node registry")
return fs
}
@@ -95,17 +100,50 @@ func (s *clientService) Validate() error {
}
func (s *clientService) PreRun(ctx context.Context) error {
- var err error
- s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
- schema.Namespace(s.namespace),
- schema.ConfigureServerEndpoints(s.endpoints),
- schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword),
- schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
- schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile,
s.etcdTLSKeyFile),
- )
- if err != nil {
+ stopCh := make(chan struct{})
+ sn := make(chan os.Signal, 1)
+ l := logger.GetLogger(s.Name())
+ signal.Notify(sn,
+ syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT,
syscall.SIGTERM)
+ go func() {
+ select {
+ case si := <-sn:
+ logger.GetLogger(s.Name()).Info().Msgf("signal
received: %s", si)
+ close(stopCh)
+ case <-s.closer.CloseNotify():
+ close(stopCh)
+ }
+ }()
+
+ for {
+ var err error
+ s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
+ schema.Namespace(s.namespace),
+ schema.ConfigureServerEndpoints(s.endpoints),
+ schema.ConfigureEtcdUser(s.etcdUsername,
s.etcdPassword),
+ schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
+ schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile,
s.etcdTLSKeyFile),
+ )
+ if errors.Is(err, context.DeadlineExceeded) {
+ select {
+ case <-stopCh:
+ return errors.New("pre-run interrupted")
+ case <-time.After(s.registryTimeout):
+ return errors.New("pre-run timeout")
+ case <-s.closer.CloseNotify():
+ return errors.New("pre-run interrupted")
+ default:
+ l.Warn().Strs("etcd-endpoints",
s.endpoints).Msg("the schema registry init timeout, retrying...")
+ time.Sleep(time.Second)
+ continue
+ }
+ }
+ if err == nil {
+ break
+ }
return err
}
+
val := ctx.Value(common.ContextNodeKey)
if val == nil {
return errors.New("node id is empty")
@@ -116,7 +154,6 @@ func (s *clientService) PreRun(ctx context.Context) error {
return errors.New("node roles is empty")
}
nodeRoles := val.([]databasev1.Role)
- l := logger.GetLogger(s.Name())
nodeInfo := &databasev1.Node{
Metadata: &commonv1.Metadata{
Name: node.NodeID,
@@ -126,15 +163,26 @@ func (s *clientService) PreRun(ctx context.Context) error
{
Roles: nodeRoles,
CreatedAt: timestamppb.Now(),
}
+ var cancel context.CancelFunc
for {
- ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10)
- err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo,
s.forceRegisterNode)
+ ctx, cancel = context.WithTimeout(ctx, time.Second*10)
+ err := s.schemaRegistry.RegisterNode(ctx, nodeInfo,
s.forceRegisterNode)
cancel()
if errors.Is(err, schema.ErrGRPCAlreadyExists) {
return errors.Wrapf(err, "node[%s] already exists in
etcd", node.NodeID)
} else if errors.Is(err, context.DeadlineExceeded) {
- l.Warn().Strs("etcd-endpoints",
s.endpoints).Msg("register node timeout, retrying...")
- continue
+ select {
+ case <-stopCh:
+ return errors.New("register node interrupted")
+ case <-time.After(s.registryTimeout):
+ return errors.New("register node timeout")
+ case <-s.closer.CloseNotify():
+ return errors.New("register node interrupted")
+ default:
+ l.Warn().Strs("etcd-endpoints",
s.endpoints).Msg("register node timeout, retrying...")
+ time.Sleep(time.Second)
+ continue
+ }
}
if err == nil {
l.Info().Stringer("info", nodeInfo).Msg("register node
successfully")
diff --git a/scripts/push-release.sh b/scripts/push-release.sh
index 0a0ed328..4c41f32e 100755
--- a/scripts/push-release.sh
+++ b/scripts/push-release.sh
@@ -45,6 +45,7 @@ cp ${PRODUCT_NAME}-*.tgz.asc skywalking/banyandb/"$VERSION"
cp ${PRODUCT_NAME}-*.tgz.sha512 skywalking/banyandb/"$VERSION"
cd skywalking/banyandb && svn add "$VERSION" && svn commit -m "Draft Apache
SkyWalking BanyanDB release $VERSION"
+cd "$VERSION"
cat << EOF
=========================================================================