This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a69ec569 Fix the data node can't re-register to etcd (#471)
a69ec569 is described below
commit a69ec5693ee5bca4a0b65e73b269988a5660999d
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jun 20 13:45:54 2024 +0800
Fix the data node can't re-register to etcd (#471)
---
CHANGES.md | 1 +
banyand/internal/storage/segment.go | 40 ++++++----
banyand/internal/storage/version.go | 28 +++++--
banyand/metadata/schema/etcd.go | 125 ++++++++++++++++++++++++-------
banyand/metadata/schema/etcd_test.go | 10 +--
banyand/metadata/schema/register_test.go | 35 ++++++++-
banyand/metadata/schema/watcher_test.go | 9 ++-
pkg/logger/logger.go | 11 ++-
8 files changed, 200 insertions(+), 59 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 89623919..52332c17 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
- Fix the filtering of stream in descending order by timestamp.
- Fix querying old data points when the data is in a newer part. A version
column is introduced to each data point and stored in the timestamp file.
- Fix the bug that duplicated data points from different data nodes are
returned.
+- Fix the bug that the data node can't re-register to etcd when the connection
is lost.
## 0.6.1
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 3ad6d984..7c70e448 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -19,8 +19,8 @@ package storage
import (
"context"
- "errors"
"fmt"
+ "io/fs"
"path"
"path/filepath"
"sort"
@@ -29,8 +29,11 @@ import (
"sync/atomic"
"time"
+ "github.com/pkg/errors"
+
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/internal/bucket"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -209,25 +212,36 @@ func (sc *segmentController[T, O]) Parse(value string)
(time.Time, error) {
func (sc *segmentController[T, O]) open() error {
sc.Lock()
defer sc.Unlock()
- return loadSegments(sc.location, segPathPrefix, sc, sc.segmentSize,
func(start, end time.Time) error {
- compatibleVersions, err := readCompatibleVersions()
- if err != nil {
- return err
- }
+ emptySegments := make([]string, 0)
+ err := loadSegments(sc.location, segPathPrefix, sc, sc.segmentSize,
func(start, end time.Time) error {
suffix := sc.Format(start)
- metadataPath := path.Join(sc.location, fmt.Sprintf(segTemplate,
suffix), metadataFilename)
+ segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate,
suffix))
+ metadataPath := path.Join(segmentPath, metadataFilename)
version, err := lfs.Read(metadataPath)
if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ emptySegments = append(emptySegments,
segmentPath)
+ return nil
+ }
return err
}
- for _, cv := range compatibleVersions[compatibleVersionsKey] {
- if string(version) == cv {
- _, err := sc.load(start, end, sc.location)
- return err
- }
+ if len(version) == 0 {
+ emptySegments = append(emptySegments, segmentPath)
+ return nil
+ }
+ if err = checkVersion(convert.BytesToString(version)); err !=
nil {
+ return err
}
- return errVersionIncompatible
+ _, err = sc.load(start, end, sc.location)
+ return err
})
+ if len(emptySegments) > 0 {
+ sc.l.Warn().Strs("segments", emptySegments).Msg("empty segments
found, removing them.")
+ for i := range emptySegments {
+ lfs.MustRMAll(emptySegments[i])
+ }
+ }
+ return err
}
func (sc *segmentController[T, O]) create(start time.Time) (*segment[T],
error) {
diff --git a/banyand/internal/storage/version.go
b/banyand/internal/storage/version.go
index e1cbbd06..ddae25c4 100644
--- a/banyand/internal/storage/version.go
+++ b/banyand/internal/storage/version.go
@@ -20,8 +20,9 @@ package storage
import (
"embed"
"encoding/json"
- "errors"
+ "strings"
+ "github.com/pkg/errors"
"sigs.k8s.io/yaml"
)
@@ -34,21 +35,36 @@ const (
var errVersionIncompatible = errors.New("version not compatible")
+var compatibleVersions = readCompatibleVersions()
+
//go:embed versions.yml
var versionFS embed.FS
-func readCompatibleVersions() (map[string][]string, error) {
+func checkVersion(version string) error {
+ for _, v := range compatibleVersions {
+ if v == version {
+ return nil
+ }
+ }
+ return errors.WithMessagef(errVersionIncompatible, "incompatible
version %s, supported versions: %s", version, strings.Join(compatibleVersions,
", "))
+}
+
+func readCompatibleVersions() []string {
i, err := versionFS.ReadFile(compatibleVersionsFilename)
if err != nil {
- return nil, err
+ panic(err)
}
j, err := yaml.YAMLToJSON(i)
if err != nil {
- return nil, err
+ panic(err)
}
var compatibleVersions map[string][]string
if err := json.Unmarshal(j, &compatibleVersions); err != nil {
- return nil, err
+ panic(err)
+ }
+ vv, ok := compatibleVersions[compatibleVersionsKey]
+ if !ok {
+ panic("versions not found")
}
- return compatibleVersions, nil
+ return vv
}
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 249df6e2..38b5615a 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -380,82 +380,151 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context,
metadata Metadata) (boo
return false, nil
}
+const leaseDuration = 5 * time.Second
+
func (e *etcdSchemaRegistry) Register(ctx context.Context, metadata Metadata,
forced bool) error {
if !e.closer.AddRunning() {
return ErrClosed
}
defer e.closer.Done()
- key, err := metadata.key()
+
+ key, err := e.prepareKey(metadata)
if err != nil {
return err
}
- key = e.prependNamespace(key)
- val, err := proto.Marshal(metadata.Spec.(proto.Message))
+
+ val, err := e.prepareValue(metadata)
if err != nil {
return err
}
- // Create a lease with a short TTL
- lease, err := e.client.Grant(ctx, 5) // 5 seconds
+
+ lease, err := e.client.Grant(ctx, int64(leaseDuration.Seconds()))
if err != nil {
+ return fmt.Errorf("failed to grant lease for key %s: %w", key,
err)
+ }
+
+ if err := e.putKeyVal(ctx, key, val, lease, forced); err != nil {
return err
}
+
+ //nolint:contextcheck
+ if err := e.keepLeaseAlive(lease, key, val); err != nil {
+ return fmt.Errorf("failed to keep lease alive for key %s: %w",
key, err)
+ }
+
+ return nil
+}
+
+func (e *etcdSchemaRegistry) prepareKey(metadata Metadata) (string, error) {
+ key, err := metadata.key()
+ if err != nil {
+ return "", err
+ }
+ return e.prependNamespace(key), nil
+}
+
+func (e *etcdSchemaRegistry) prepareValue(metadata Metadata) (string, error) {
+ val, err := proto.Marshal(metadata.Spec.(proto.Message))
+ if err != nil {
+ return "", err
+ }
+ return string(val), nil
+}
+
+func (e *etcdSchemaRegistry) putKeyVal(ctx context.Context, key, val string,
lease *clientv3.LeaseGrantResponse, forced bool) error {
if forced {
- if _, err = e.client.Put(ctx, key, string(val),
clientv3.WithLease(lease.ID)); err != nil {
- return err
+ if _, err := e.client.Put(ctx, key, val,
clientv3.WithLease(lease.ID)); err != nil {
+ return fmt.Errorf("failed to forcefully put key-value
pair for key %s: %w", key, err)
}
} else {
- var ops []clientv3.Cmp
- ops = append(ops,
clientv3.Compare(clientv3.CreateRevision(key), "=", 0))
+ ops :=
[]clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision(key), "=", 0)}
txn := e.client.Txn(ctx).If(ops...)
- txn = txn.Then(clientv3.OpPut(key, string(val),
clientv3.WithLease(lease.ID)))
+ txn = txn.Then(clientv3.OpPut(key, val,
clientv3.WithLease(lease.ID)))
txn = txn.Else(clientv3.OpGet(key))
- response, errCommit := txn.Commit()
- if errCommit != nil {
- return errCommit
+ response, err := txn.Commit()
+ if err != nil {
+ return fmt.Errorf("failed to commit transaction for key
%s: %w", key, err)
}
-
if !response.Succeeded {
tr := pb.TxnResponse(*response)
- return errors.Wrapf(ErrGRPCAlreadyExists, "response:
%s", tr.String())
+ return errors.Wrapf(ErrGRPCAlreadyExists, "key %s,
response: %s", key, tr.String())
}
}
+ return nil
+}
- // Keep the lease alive
- // nolint:contextcheck
+func (e *etcdSchemaRegistry) keepLeaseAlive(lease
*clientv3.LeaseGrantResponse, key, val string) error {
keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID)
if err != nil {
- return err
+ return fmt.Errorf("failed to keep lease alive for key %s: %w",
key, err)
}
- // nolint:contextcheck
+
go func() {
if !e.closer.AddRunning() {
return
}
defer func() {
- e.l.Info().Msgf("revoking lease %d", lease.ID)
- ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second)
- _, err = e.client.Lease.Revoke(ctx, lease.ID)
- cancel()
- if err != nil {
- e.l.Error().Err(err).Msgf("failed to revoke
lease %d", lease.ID)
- }
+ e.revokeLease(lease)
e.closer.Done()
}()
+
for {
select {
case <-e.closer.CloseNotify():
return
case keepAliveResp := <-keepAliveChan:
if keepAliveResp == nil {
- // The channel has been closed
- return
+ keepAliveChan =
e.revokeAndReconnectLease(lease, key, val)
}
}
}
}()
+
return nil
}
+func (e *etcdSchemaRegistry) revokeAndReconnectLease(lease
*clientv3.LeaseGrantResponse, key, val string) <-chan
*clientv3.LeaseKeepAliveResponse {
+ for {
+ e.revokeLease(lease)
+ select {
+ case <-e.closer.CloseNotify():
+ return nil
+ default:
+ lease, err := e.client.Grant(context.Background(),
int64(leaseDuration.Seconds()))
+ if err != nil {
+ e.l.Error().Err(err).Msg("failed to grant
lease")
+ time.Sleep(leaseDuration)
+ continue
+ }
+ _, err = e.client.Put(context.Background(), key, val,
clientv3.WithLease(lease.ID))
+ if err != nil {
+ e.l.Error().Err(err).Msg("failed to put
key-value pair")
+ time.Sleep(leaseDuration)
+ continue
+ }
+ keepAliveChan, err :=
e.client.KeepAlive(context.Background(), lease.ID)
+ if err != nil {
+ e.l.Error().Err(err).Msg("failed to keep alive")
+ time.Sleep(leaseDuration)
+ } else {
+ return keepAliveChan
+ }
+ }
+ }
+}
+
+func (e *etcdSchemaRegistry) revokeLease(lease *clientv3.LeaseGrantResponse) {
+ if lease == nil {
+ return
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), leaseDuration)
+ defer cancel()
+ _, err := e.client.Lease.Revoke(ctx, lease.ID)
+ if err != nil {
+ e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID)
+ }
+}
+
func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, handler
watchEventHandler) *watcher {
return e.newWatcherWithRevision(name, kind, 0, handler)
}
diff --git a/banyand/metadata/schema/etcd_test.go
b/banyand/metadata/schema/etcd_test.go
index 6bb877ee..d288d8cb 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -21,11 +21,9 @@ import (
"context"
"embed"
"fmt"
- "os"
"path"
"testing"
- "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
@@ -100,11 +98,8 @@ func preloadSchema(e schema.Registry) error {
return nil
}
-func randomTempDir() string {
- return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s",
uuid.New().String()))
-}
-
func initServerAndRegister(t *testing.T) (schema.Registry, func()) {
+ path, defFn := test.Space(require.New(t))
req := require.New(t)
ports, err := test.AllocateFreePorts(2)
if err != nil {
@@ -113,7 +108,7 @@ func initServerAndRegister(t *testing.T) (schema.Registry,
func()) {
endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d", ports[0])}
server, err := embeddedetcd.NewServer(
embeddedetcd.ConfigureListener(endpoints,
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(randomTempDir()))
+ embeddedetcd.RootDir(path))
req.NoError(err)
req.NotNil(server)
<-server.ReadyNotify()
@@ -124,6 +119,7 @@ func initServerAndRegister(t *testing.T) (schema.Registry,
func()) {
server.Close()
<-server.StopNotify()
schemaRegistry.Close()
+ defFn()
}
}
diff --git a/banyand/metadata/schema/register_test.go
b/banyand/metadata/schema/register_test.go
index 8e373cfc..1e9ceef8 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -20,6 +20,8 @@ package schema_test
import (
"context"
"fmt"
+ "os"
+ "time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
@@ -34,7 +36,9 @@ import (
)
var _ = ginkgo.Describe("etcd_register", func() {
- var endpoints []string
+ var path string
+ var defFn func()
+ var endpoints, peers []string
var goods []gleak.Goroutine
var server embeddedetcd.Server
var r schema.Registry
@@ -51,13 +55,17 @@ var _ = ginkgo.Describe("etcd_register", func() {
},
}
ginkgo.BeforeEach(func() {
+ var err error
+ path, defFn, err = test.NewSpace()
+ gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
goods = gleak.Goroutines()
ports, err := test.AllocateFreePorts(2)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d",
ports[0])}
+ peers = []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}
server, err = embeddedetcd.NewServer(
- embeddedetcd.ConfigureListener(endpoints,
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(randomTempDir()))
+ embeddedetcd.ConfigureListener(endpoints, peers),
+ embeddedetcd.RootDir(path))
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
<-server.ReadyNotify()
r, err = schema.NewEtcdSchemaRegistry(
@@ -70,6 +78,7 @@ var _ = ginkgo.Describe("etcd_register", func() {
gomega.Expect(r.Close()).ShouldNot(gomega.HaveOccurred())
server.Close()
gomega.Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ defFn()
})
ginkgo.It("should revoke the leaser", func() {
@@ -89,4 +98,24 @@ var _ = ginkgo.Describe("etcd_register", func() {
gomega.Expect(r.Register(context.Background(), md,
false)).ShouldNot(gomega.HaveOccurred())
gomega.Expect(r.Register(context.Background(), md,
false)).Should(gomega.MatchError(schema.ErrGRPCAlreadyExists))
})
+
+ ginkgo.It("should reconnect", func() {
+ gomega.Expect(r.Register(context.Background(), md,
true)).ShouldNot(gomega.HaveOccurred())
+ _, err := r.GetNode(context.Background(), node)
+ gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+ gomega.Expect(server.Close()).ShouldNot(gomega.HaveOccurred())
+ time.Sleep(1 * time.Second)
+ os.RemoveAll(path)
+
+ server, err = embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener(endpoints, peers),
+ embeddedetcd.RootDir(path))
+ gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+ <-server.ReadyNotify()
+
+ gomega.Eventually(func() error {
+ _, err := r.GetNode(context.Background(), node)
+ return err
+ }, flags.EventuallyTimeout).ShouldNot(gomega.HaveOccurred())
+ })
})
diff --git a/banyand/metadata/schema/watcher_test.go
b/banyand/metadata/schema/watcher_test.go
index 472f863c..65c45b40 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -79,6 +79,7 @@ var _ = ginkgo.Describe("Watcher", func() {
mockedObj *mockedHandler
server embeddedetcd.Server
registry schema.Registry
+ defFn func()
)
ginkgo.BeforeEach(func() {
@@ -87,6 +88,11 @@ var _ = ginkgo.Describe("Watcher", func() {
Env: "dev",
Level: flags.LogLevel,
})).To(gomega.Succeed())
+ var path string
+ var err error
+ path, defFn, err = test.NewSpace()
+ gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+
ports, err := test.AllocateFreePorts(2)
if err != nil {
panic("fail to find free ports")
@@ -94,7 +100,7 @@ var _ = ginkgo.Describe("Watcher", func() {
endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d",
ports[0])}
server, err = embeddedetcd.NewServer(
embeddedetcd.ConfigureListener(endpoints,
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
- embeddedetcd.RootDir(randomTempDir()))
+ embeddedetcd.RootDir(path))
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
<-server.ReadyNotify()
registry, err = schema.NewEtcdSchemaRegistry(
@@ -107,6 +113,7 @@ var _ = ginkgo.Describe("Watcher", func() {
registry.Close()
server.Close()
<-server.StopNotify()
+ defFn()
})
ginkgo.It("should handle all existing key-value pairs on initial load",
func() {
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 9743503f..b967083a 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -96,7 +96,16 @@ func (l *Logger) ToZapConfig() zap.Config {
}
if !l.development {
config := zap.NewProductionConfig()
- config.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
+ switch l.GetLevel() {
+ case zerolog.DebugLevel:
+ config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
+ case zerolog.InfoLevel:
+ config.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
+ case zerolog.WarnLevel:
+ config.Level = zap.NewAtomicLevelAt(zap.WarnLevel)
+ default:
+ config.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
+ }
return config
}
encoderConfig := zapcore.EncoderConfig{