This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new fd33494e Fix the issue that the etcd watcher gets the historical node
registration events (#651)
fd33494e is described below
commit fd33494e5555b2999e3a56f2f414fd6d1d44111d
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Apr 18 21:58:18 2025 +0800
Fix the issue that the etcd watcher gets the historical node registration
events (#651)
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
Makefile | 2 +-
banyand/metadata/schema/etcd.go | 10 ++--
banyand/metadata/schema/schema.go | 2 +-
banyand/metadata/schema/watcher_test.go | 85 +++++++++++++++++++++++++++++----
banyand/queue/pub/pub.go | 6 ++-
pkg/test/flags/flags.go | 13 +++++
7 files changed, 100 insertions(+), 19 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a0cecbf9..0da6ead6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -12,6 +12,7 @@ Release Notes.
### Bug Fixes
- Fix the deadlock issue when loading a closed segment.
+- Fix the issue that the etcd watcher gets the historical node registration
events.
## 0.8.0
diff --git a/Makefile b/Makefile
index 0d6a1747..9a962cae 100644
--- a/Makefile
+++ b/Makefile
@@ -74,7 +74,7 @@ include scripts/build/ginkgo.mk
test-ci: $(GINKGO) ## Run the unit tests in CI
$(GINKGO) --race \
-ldflags \
- "-X
github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X
github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \
+ "-X
github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X
github.com/apache/skywalking-banyandb/pkg/test/flags.consistentlyTimeout=10s -X
github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \
$(TEST_CI_OPTS) \
./...
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 41f66f1c..eebd6eef 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -172,7 +172,7 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string,
kind Kind, handler Eve
return
}
for i := range kinds {
- e.registerToWatcher(name, kinds[i], 0, handler)
+ e.registerToWatcher(name, kinds[i], -1, handler)
}
}
@@ -186,7 +186,7 @@ func (e *etcdSchemaRegistry) registerToWatcher(name string,
kind Kind, revision
return
}
e.l.Info().Str("name", name).Stringer("kind", kind).Msg("registering to
a new watcher")
- w := e.newWatcherWithRevision(name, kind, revision,
CheckInterval(e.checkInterval))
+ w := e.NewWatcher(name, kind, revision, CheckInterval(e.checkInterval))
w.AddHandler(handler)
e.watchers[kind] = w
}
@@ -578,11 +578,7 @@ func (e *etcdSchemaRegistry) revokeLease(lease
*clientv3.LeaseGrantResponse) {
}
}
-func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, opts
...WatcherOption) *watcher {
- return e.newWatcherWithRevision(name, kind, 0, opts...)
-}
-
-func (e *etcdSchemaRegistry) newWatcherWithRevision(name string, kind Kind,
revision int64, opts ...WatcherOption) *watcher {
+func (e *etcdSchemaRegistry) NewWatcher(name string, kind Kind, revision
int64, opts ...WatcherOption) *watcher {
wc := watcherConfig{
key: e.prependNamespace(kind.key()),
kind: kind,
diff --git a/banyand/metadata/schema/schema.go
b/banyand/metadata/schema/schema.go
index 472a34ed..d103536f 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -66,7 +66,7 @@ type Registry interface {
Node
Property
RegisterHandler(string, Kind, EventHandler)
- NewWatcher(string, Kind, ...WatcherOption) *watcher
+ NewWatcher(string, Kind, int64, ...WatcherOption) *watcher
Register(context.Context, Metadata, bool) error
Compact(context.Context, int64) error
StartWatcher()
diff --git a/banyand/metadata/schema/watcher_test.go
b/banyand/metadata/schema/watcher_test.go
index c2fa90dd..20c0b7e1 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -81,6 +81,7 @@ var _ = ginkgo.Describe("Watcher", func() {
server embeddedetcd.Server
registry schema.Registry
defFn func()
+ endpoints []string
)
ginkgo.BeforeEach(func() {
@@ -98,7 +99,7 @@ var _ = ginkgo.Describe("Watcher", func() {
if err != nil {
panic("fail to find free ports")
}
- endpoints := []string{fmt.Sprintf("http://127.0.0.1:%d",
ports[0])}
+ endpoints = []string{fmt.Sprintf("http://127.0.0.1:%d",
ports[0])}
server, err = embeddedetcd.NewServer(
embeddedetcd.ConfigureListener(endpoints,
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
embeddedetcd.RootDir(path))
@@ -172,7 +173,7 @@ var _ = ginkgo.Describe("Watcher", func() {
}
// Start the watcher
- watcher := registry.NewWatcher("test", schema.KindMeasure)
+ watcher := registry.NewWatcher("test", schema.KindMeasure, 0)
watcher.AddHandler(mockedObj)
watcher.Start()
ginkgo.DeferCleanup(func() {
@@ -191,7 +192,7 @@ var _ = ginkgo.Describe("Watcher", func() {
}, flags.EventuallyTimeout).Should(gomega.BeTrue())
})
ginkgo.It("should handle watch events", func() {
- watcher := registry.NewWatcher("test", schema.KindStream)
+ watcher := registry.NewWatcher("test", schema.KindStream, 0)
watcher.AddHandler(mockedObj)
watcher.Start()
ginkgo.DeferCleanup(func() {
@@ -316,7 +317,7 @@ var _ = ginkgo.Describe("Watcher", func() {
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
- watcher := registry.NewWatcher("test", schema.KindMeasure,
schema.CheckInterval(1*time.Second))
+ watcher := registry.NewWatcher("test", schema.KindMeasure, 0,
schema.CheckInterval(1*time.Second))
watcher.AddHandler(mockedObj)
watcher.Start()
ginkgo.DeferCleanup(func() {
@@ -331,7 +332,7 @@ var _ = ginkgo.Describe("Watcher", func() {
})
ginkgo.It("should detect deletions", func() {
- watcher := registry.NewWatcher("test", schema.KindMeasure,
schema.CheckInterval(1*time.Second))
+ watcher := registry.NewWatcher("test", schema.KindMeasure, 0,
schema.CheckInterval(1*time.Second))
watcher.AddHandler(mockedObj)
watcher.Start()
ginkgo.DeferCleanup(func() {
@@ -395,12 +396,12 @@ var _ = ginkgo.Describe("Watcher", func() {
gomega.Eventually(func() int {
return int(mockedObj.deleteCalledNum.Load())
- }, 5*time.Second).Should(gomega.Equal(1))
+ }, flags.EventuallyTimeout).Should(gomega.Equal(1))
gomega.Expect(mockedObj.Data()).NotTo(gomega.HaveKey(measureName))
})
ginkgo.It("should recover state after compaction", func() {
- watcher := registry.NewWatcher("test", schema.KindMeasure,
schema.CheckInterval(1*time.Hour))
+ watcher := registry.NewWatcher("test", schema.KindMeasure, 0,
schema.CheckInterval(1*time.Hour))
watcher.AddHandler(mockedObj)
watcher.Start()
ginkgo.DeferCleanup(func() {
@@ -487,6 +488,74 @@ var _ = ginkgo.Describe("Watcher", func() {
gomega.Eventually(func() int {
return int(mockedObj.addOrUpdateCalledNum.Load())
- }, 5*time.Second).Should(gomega.BeNumerically(">=", 2))
+ }, flags.EventuallyTimeout).Should(gomega.BeNumerically(">=",
2))
+ })
+
+ ginkgo.It("should not load node with revision -1", func() {
+ err := registry.RegisterNode(context.Background(),
&databasev1.Node{
+ Metadata: &commonv1.Metadata{
+ Name: "testnode",
+ },
+ Roles: []databasev1.Role{
+ databasev1.Role_ROLE_DATA,
+ },
+ }, false)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ nn, err := registry.ListNode(context.Background(),
databasev1.Role_ROLE_DATA)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Expect(len(nn)).To(gomega.Equal(1))
+ gomega.Expect(nn[0].Metadata.Name).To(gomega.Equal("testnode"))
+
+ err = registry.Close()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ // Recreate registry for this test
+ registry, err = schema.NewEtcdSchemaRegistry(
+ schema.Namespace("test"),
+ schema.ConfigureServerEndpoints(endpoints),
+ )
+ gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+ watcher := registry.NewWatcher("test", schema.KindNode, -1,
schema.CheckInterval(1*time.Hour))
+ watcher.AddHandler(mockedObj)
+ watcher.Start()
+ ginkgo.DeferCleanup(func() {
+ watcher.Close()
+ })
+ gomega.Consistently(func() int {
+ return int(mockedObj.addOrUpdateCalledNum.Load())
+ }, flags.ConsistentlyTimeout).Should(gomega.BeZero())
+ })
+
+ ginkgo.It("should load and delete node with revision 0", func() {
+ // Register node again for this test
+ err := registry.RegisterNode(context.Background(),
&databasev1.Node{
+ Metadata: &commonv1.Metadata{
+ Name: "testnode",
+ },
+ Roles: []databasev1.Role{
+ databasev1.Role_ROLE_DATA,
+ },
+ }, false)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ err = registry.Close()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ // Recreate registry for this test
+ registry, err = schema.NewEtcdSchemaRegistry(
+ schema.Namespace("test"),
+ schema.ConfigureServerEndpoints(endpoints),
+ )
+ gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+ watcher := registry.NewWatcher("test", schema.KindNode, 0,
schema.CheckInterval(1*time.Hour))
+ watcher.AddHandler(mockedObj)
+ watcher.Start()
+ ginkgo.DeferCleanup(func() {
+ watcher.Close()
+ })
+ gomega.Eventually(func() int {
+ return int(mockedObj.addOrUpdateCalledNum.Load())
+ }, flags.EventuallyTimeout).Should(gomega.Equal(1))
+ gomega.Eventually(func() int {
+ return int(mockedObj.deleteCalledNum.Load())
+ }, flags.EventuallyTimeout).Should(gomega.Equal(1))
+ gomega.Expect(mockedObj.Data()).To(gomega.BeEmpty())
})
})
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index d49baeef..8cfc0ea1 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -95,7 +95,9 @@ func (p *pub) Broadcast(timeout time.Duration, topic
bus.Topic, messages bus.Mes
var nodes []*databasev1.Node
p.mu.RLock()
for k := range p.active {
- nodes = append(nodes, p.registered[k])
+ if n := p.registered[k]; n != nil {
+ nodes = append(nodes, n)
+ }
}
p.mu.RUnlock()
if len(nodes) == 0 {
@@ -104,7 +106,7 @@ func (p *pub) Broadcast(timeout time.Duration, topic
bus.Topic, messages bus.Mes
names := make(map[string]struct{})
if len(messages.NodeSelectors()) == 0 {
for _, n := range nodes {
- names[n.Metadata.Name] = struct{}{}
+ names[n.Metadata.GetName()] = struct{}{}
}
} else {
for g, sel := range messages.NodeSelectors() {
diff --git a/pkg/test/flags/flags.go b/pkg/test/flags/flags.go
index 0cbd084f..a870d66f 100644
--- a/pkg/test/flags/flags.go
+++ b/pkg/test/flags/flags.go
@@ -27,9 +27,14 @@ var (
neverTimeout string
+ consistentlyTimeout string
+
// EventuallyTimeout is the timeout of async time cases execution.
EventuallyTimeout time.Duration
+ // ConsistentlyTimeout is the timeout of async time cases execution.
+ ConsistentlyTimeout time.Duration
+
// NeverTimeout is the timeout of async time cases execution.
NeverTimeout time.Duration
@@ -44,6 +49,9 @@ func init() {
if neverTimeout == "" {
neverTimeout = "2s"
}
+ if consistentlyTimeout == "" {
+ consistentlyTimeout = "5s"
+ }
d, err := time.ParseDuration(eventuallyTimeout)
if err != nil {
panic(err)
@@ -54,4 +62,9 @@ func init() {
panic(err)
}
NeverTimeout = d
+ d, err = time.ParseDuration(consistentlyTimeout)
+ if err != nil {
+ panic(err)
+ }
+ ConsistentlyTimeout = d
}