This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch publish in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e438ad75162f954c37db504e5836f94fe2f9b916 Author: Gao Hongtao <[email protected]> AuthorDate: Tue Apr 16 02:42:25 2024 +0000 Update doc Signed-off-by: Gao Hongtao <[email protected]> --- banyand/dquery/topn.go | 10 +- banyand/queue/local.go | 2 +- banyand/queue/pub/client.go | 65 ++++++++---- banyand/queue/pub/client_test.go | 21 +++- banyand/queue/pub/pub.go | 76 ++++++++----- banyand/queue/pub/pub_suite_test.go | 21 +++- banyand/queue/pub/pub_test.go | 117 ++++++++++++++++++--- docs/concept/clustering.md | 14 +++ pkg/bus/bus.go | 3 +- .../logical/measure/measure_plan_distributed.go | 7 +- .../logical/stream/stream_plan_distributed.go | 7 +- test/cases/measure/measure.go | 2 +- 12 files changed, 273 insertions(+), 72 deletions(-) diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go index f5d7cf1f..ad666a45 100644 --- a/banyand/dquery/topn.go +++ b/banyand/dquery/topn.go @@ -18,6 +18,8 @@ package dquery import ( + "time" + "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" @@ -31,6 +33,8 @@ import ( pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) +const defaultTopNQueryTimeout = 10 * time.Second + type topNQueryProcessor struct { broadcaster bus.Broadcaster *queryService @@ -52,7 +56,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { agg := request.Agg request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED now := bus.MessageID(request.TimeRange.Begin.Nanos) - ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery, bus.NewMessage(now, request)) + ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, data.TopicTopNQuery, bus.NewMessage(now, request)) if err != nil { resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.Metadata.GetName(), err)) return @@ -87,6 +91,10 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { } } } + if allErr != nil { + resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.Metadata.GetName(), allErr)) + return + } if tags == nil { resp = bus.NewMessage(now, &measurev1.TopNResponse{}) return diff --git a/banyand/queue/local.go b/banyand/queue/local.go index 58c67781..75cfb47d 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -65,7 +65,7 @@ func (l *local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, er return l.local.Publish(topic, message...) } -func (l *local) Broadcast(topic bus.Topic, message bus.Message) ([]bus.Future, error) { +func (l *local) Broadcast(_ time.Duration, topic bus.Topic, message bus.Message) ([]bus.Future, error) { f, err := l.Publish(topic, message) if err != nil { return nil, err diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index 7bdc77da..b82dbc67 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -97,11 +97,13 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { p.mu.Lock() defer p.mu.Unlock() + p.registered[name] = struct{}{} + // If the client already exists, just return - if _, ok := p.clients[name]; ok { + if _, ok := p.active[name]; ok { return } - if _, ok := p.evictClients[name]; ok { + if _, ok := p.evictable[name]; ok { return } conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy)) @@ -111,15 +113,16 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { } if !p.checkClient(conn, md) { + p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is unhealthy, move it to evict queue") return } c := clusterv1.NewServiceClient(conn) - p.clients[name] = &client{conn: conn, client: c, md: md} + p.active[name] = &client{conn: conn, client: c, md: md} if p.handler != nil { p.handler.OnAddOrUpdate(md) } - p.log.Info().Stringer("node", node).Msg("new node is healthy, add it to active queue") + p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("new node is healthy, add it to active queue") } func (p *pub) OnDelete(md schema.Metadata) { @@ -138,20 +141,21 @@ func (p *pub) OnDelete(md schema.Metadata) { } p.mu.Lock() defer p.mu.Unlock() - if en, ok := p.evictClients[name]; ok { + delete(p.registered, name) + if en, ok := p.evictable[name]; ok { close(en.c) - delete(p.evictClients, name) - p.log.Info().Stringer("node", node).Msg("node is removed from evict queue by delete event") + delete(p.evictable, name) + p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is removed from evict queue by delete event") return } - if client, ok := p.clients[name]; ok && !p.healthCheck(node, client.conn) { + if client, ok := p.active[name]; ok && !p.healthCheck(node, client.conn) { _ = client.conn.Close() - delete(p.clients, name) + delete(p.active, name) if p.handler != nil { p.handler.OnDelete(md) } - p.log.Info().Stringer("node", node).Msg("node is removed from active queue by delete event") + p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is removed from active queue by delete event") } } @@ -168,9 +172,8 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool { if !p.closer.AddRunning() { return false } - p.log.Info().Stringer("node", node).Msg("node is unhealthy, move it to evict queue") name := node.Metadata.Name - p.evictClients[name] = evictNode{n: node, c: make(chan struct{})} + p.evictable[name] = evictNode{n: node, c: make(chan struct{})} if p.handler != nil { p.handler.OnDelete(md) } @@ -184,22 +187,25 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool { if errEvict == nil && p.healthCheck(en.n, connEvict) { p.mu.Lock() defer p.mu.Unlock() - if _, ok := p.evictClients[name]; !ok { + if _, ok := p.evictable[name]; !ok { // The client has been removed from evict clients map, just return return } c := clusterv1.NewServiceClient(connEvict) - p.clients[name] = &client{conn: connEvict, client: c, md: md} + p.active[name] = &client{conn: connEvict, client: c, md: md} if p.handler != nil { p.handler.OnAddOrUpdate(md) } - delete(p.evictClients, name) + delete(p.evictable, name) p.log.Info().Stringer("node", en.n).Msg("node is healthy, move it back to active queue") return } if errEvict != nil { _ = connEvict.Close() } + if _, ok := p.registered[name]; !ok { + return + } p.log.Error().Err(errEvict).Msgf("failed to re-connect to grpc server after waiting for %s", backoff) case <-en.c: return @@ -210,7 +216,7 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool { backoff = maxBackoff } } - }(p, name, p.evictClients[name], md) + }(p, name, p.evictable[name], md) return false } @@ -237,20 +243,37 @@ func (p *pub) healthCheck(node fmt.Stringer, conn *grpc.ClientConn) bool { func (p *pub) failover(node string) { p.mu.Lock() defer p.mu.Unlock() - if en, ok := p.evictClients[node]; ok { + if en, ok := p.evictable[node]; ok { close(en.c) - delete(p.evictClients, node) - p.log.Info().Str("node", node).Msg("node is removed from evict queue by wire event") + delete(p.evictable, node) + p.log.Info().Str("node", node).Str("status", p.dump()).Msg("node is removed from evict queue by wire event") return } - if client, ok := p.clients[node]; ok && !p.checkClient(client.conn, client.md) { + if client, ok := p.active[node]; ok && !p.checkClient(client.conn, client.md) { _ = client.conn.Close() - delete(p.clients, node) + delete(p.active, node) if p.handler != nil { p.handler.OnDelete(client.md) } + p.log.Info().Str("status", p.dump()).Str("node", node).Msg("node is unhealthy, move it to evict queue") + } +} + +func (p *pub) dump() string { + keysRegistered := make([]string, 0, len(p.registered)) + for k := range p.registered { + keysRegistered = append(keysRegistered, k) + } + keysActive := make([]string, 0, len(p.active)) + for k := range p.active { + keysActive = append(keysActive, k) + } + keysEvictable := make([]string, 0, len(p.evictable)) + for k := range p.evictable { + keysEvictable = append(keysEvictable, k) } + return fmt.Sprintf("registered: %v, active :%v, evictable :%v", keysRegistered, keysActive, keysEvictable) } type evictNode struct { diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go index 83200aa9..2ced4841 100644 --- a/banyand/queue/pub/client_test.go +++ b/banyand/queue/pub/client_test.go @@ -73,17 +73,32 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { gomega.Eventually(func() int { p.mu.RLock() defer p.mu.RUnlock() - return len(p.clients) + return len(p.active) }, flags.EventuallyTimeout).Should(gomega.Equal(1)) verifyClients(p, 1, 0, 1, 1) }) + + ginkgo.It("should be removed", func() { + addr1 := getAddress() + node1 := getDataNode("node1", addr1) + p := newPub() + defer p.GracefulStop() + closeFn := setup(addr1, codes.OK, 200*time.Millisecond) + p.OnAddOrUpdate(node1) + verifyClients(p, 1, 0, 1, 0) + closeFn() + p.failover("node1") + verifyClients(p, 0, 1, 1, 2) + p.OnDelete(node1) + verifyClients(p, 0, 0, 1, 2) + }) }) func verifyClients(p *pub, active, evict, onAdd, onDelete int) { p.mu.RLock() defer p.mu.RUnlock() - gomega.Expect(p.clients).Should(gomega.HaveLen(active)) - gomega.Expect(p.evictClients).Should(gomega.HaveLen(evict)) + gomega.Expect(p.active).Should(gomega.HaveLen(active)) + gomega.Expect(p.evictable).Should(gomega.HaveLen(evict)) h := p.handler.(*mockHandler) gomega.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd)) gomega.Expect(h.deleteCount).Should(gomega.Equal(onDelete)) diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index 200b2b9d..dc650cfa 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -27,6 +27,8 @@ import ( "github.com/pkg/errors" "go.uber.org/multierr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -47,13 +49,14 @@ var ( type pub struct { schema.UnimplementedOnInitHandler - metadata metadata.Repo - handler schema.EventHandler - log *logger.Logger - clients map[string]*client - evictClients map[string]evictNode - closer *run.Closer - mu sync.RWMutex + metadata metadata.Repo + handler schema.EventHandler + log *logger.Logger + registered map[string]struct{} + active map[string]*client + evictable map[string]evictNode + closer *run.Closer + mu sync.RWMutex } func (p *pub) Register(handler schema.EventHandler) { @@ -63,16 +66,16 @@ func (p *pub) Register(handler schema.EventHandler) { func (p *pub) GracefulStop() { p.mu.Lock() defer p.mu.Unlock() - for i := range p.evictClients { - close(p.evictClients[i].c) + for i := range p.evictable { + close(p.evictable[i].c) } - p.evictClients = nil + p.evictable = nil p.closer.Done() p.closer.CloseThenWait() - for _, c := range p.clients { + for _, c := range p.active { _ = c.conn.Close() } - p.clients = nil + p.active = nil } // Serve implements run.Service. @@ -80,10 +83,10 @@ func (p *pub) Serve() run.StopNotify { return p.closer.CloseNotify() } -func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, error) { +func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages bus.Message) ([]bus.Future, error) { var names []string p.mu.RLock() - for k := range p.clients { + for k := range p.active { names = append(names, k) } p.mu.RUnlock() @@ -93,7 +96,7 @@ func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, er wg.Add(1) go func(n string) { defer wg.Done() - f, err := p.Publish(topic, bus.NewMessageWithNode(messages.ID(), n, messages.Data())) + f, err := p.publish(timeout, topic, bus.NewMessageWithNode(messages.ID(), n, messages.Data())) futureCh <- publishResult{n: n, f: f, e: err} }(n) } @@ -106,6 +109,14 @@ func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, er for f := range futureCh { if f.e != nil { errs = multierr.Append(errs, errors.Wrapf(f.e, "failed to publish message to %s", f.n)) + if isFailoverError(f.e) { + if p.closer.AddRunning() { + go func() { + defer p.closer.Done() + p.failover(f.n) + }() + } + } continue } futures = append(futures, f.f) @@ -123,7 +134,7 @@ type publishResult struct { n string } -func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { +func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages ...bus.Message) (bus.Future, error) { var err error f := &future{} handleMessage := func(m bus.Message, err error) error { @@ -133,12 +144,12 @@ func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, err } node := m.Node() p.mu.RLock() - client, ok := p.clients[node] + client, ok := p.active[node] p.mu.RUnlock() if !ok { return multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) f.cancelFn = append(f.cancelFn, cancel) stream, errCreateStream := client.client.Send(ctx) if errCreateStream != nil { @@ -158,6 +169,10 @@ func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, err return f, err } +func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { + return p.publish(5*time.Second, topic, messages...) +} + // NewBatchPublisher returns a new batch publisher. func (p *pub) NewBatchPublisher(timeout time.Duration) queue.BatchPublisher { return &batchPublisher{ @@ -171,10 +186,11 @@ func (p *pub) NewBatchPublisher(timeout time.Duration) queue.BatchPublisher { // New returns a new queue client. func New(metadata metadata.Repo) queue.Client { return &pub{ - metadata: metadata, - clients: make(map[string]*client), - evictClients: make(map[string]evictNode), - closer: run.NewCloser(1), + metadata: metadata, + active: make(map[string]*client), + evictable: make(map[string]evictNode), + registered: make(map[string]struct{}), + closer: run.NewCloser(1), } } @@ -253,7 +269,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus } bp.pub.mu.RLock() - client, ok := bp.pub.clients[node] + client, ok := bp.pub.active[node] bp.pub.mu.RUnlock() if !ok { err = multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) @@ -393,10 +409,12 @@ func (b *batchFuture) get() []string { mux.Lock() defer mux.Unlock() // only log the error once for each node - if _, ok := b.errNodes[evt.n]; !ok { + if _, ok := b.errNodes[evt.n]; !ok && isFailoverError(evt.e) { b.l.Error().Err(evt.e).Msgf("failed to send message to node %s", evt.n) + b.errNodes[evt.n] = struct{}{} + return } - b.errNodes[evt.n] = struct{}{} + b.l.Error().Err(evt.e).Msgf("failed to send message to node %s", evt.n) }() } }(e, &mux) @@ -410,3 +428,11 @@ func (b *batchFuture) get() []string { } return result } + +func isFailoverError(err error) bool { + s, ok := status.FromError(err) + if !ok { + return false + } + return s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded +} diff --git a/banyand/queue/pub/pub_suite_test.go b/banyand/queue/pub/pub_suite_test.go index 9a1c65aa..79080d9a 100644 --- a/banyand/queue/pub/pub_suite_test.go +++ b/banyand/queue/pub/pub_suite_test.go @@ -32,7 +32,9 @@ import ( "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + "github.com/apache/skywalking-banyandb/api/data" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -80,10 +82,27 @@ func (s *mockServer) Send(stream clusterv1.Service_SendServer) error { time.Sleep(s.latency) } + topic, ok := data.TopicMap[req.Topic] + + if !ok { + panic("invalid topic") + } + f := data.TopicResponseMap[topic] + var body *anypb.Any + if f == nil { + body = req.Body + } else { + var errAny error + body, errAny = anypb.New(f()) + if errAny != nil { + panic(errAny) + } + } + res := &clusterv1.SendResponse{ MessageId: req.MessageId, Error: "", - Body: req.Body, + Body: body, } if err := stream.Send(res); err != nil { diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go index bf20bfc0..3f4f39d5 100644 --- a/banyand/queue/pub/pub_test.go +++ b/banyand/queue/pub/pub_test.go @@ -24,13 +24,15 @@ import ( "github.com/onsi/gomega" "github.com/onsi/gomega/gleak" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/apache/skywalking-banyandb/api/data" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/test/flags" ) -var _ = ginkgo.Describe("publish clients register/unregister", func() { +var _ = ginkgo.Describe("Publish and Broadcast", func() { var goods []gleak.Goroutine ginkgo.BeforeEach(func() { goods = gleak.Goroutines() @@ -39,7 +41,7 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) }) - ginkgo.Context("publisher and batch publisher", func() { + ginkgo.Context("Publisher and batch publisher", func() { ginkgo.It("should publish messages", func() { addr1 := getAddress() addr2 := getAddress() @@ -58,9 +60,8 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { bp := p.NewBatchPublisher(3 * time.Second) defer bp.Close() - t := bus.UniTopic("test") for i := 0; i < 10; i++ { - _, err := bp.Publish(t, + _, err := bp.Publish(data.TopicStreamWrite, bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), ) @@ -84,10 +85,9 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - bp := p.NewBatchPublisher(3 * time.Second) - t := bus.UniTopic("test") + bp := p.NewBatchPublisher(30 * time.Second) for i := 0; i < 10; i++ { - _, err := bp.Publish(t, + _, err := bp.Publish(data.TopicStreamWrite, bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), ) @@ -97,14 +97,14 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { gomega.Eventually(func() int { p.mu.RLock() defer p.mu.RUnlock() - return len(p.clients) + return len(p.active) }, flags.EventuallyTimeout).Should(gomega.Equal(1)) func() { p.mu.RLock() defer p.mu.RUnlock() - gomega.Expect(p.evictClients).Should(gomega.HaveLen(1)) - gomega.Expect(p.evictClients).Should(gomega.HaveKey("node2")) - gomega.Expect(p.clients).Should(gomega.HaveKey("node1")) + gomega.Expect(p.evictable).Should(gomega.HaveLen(1)) + gomega.Expect(p.evictable).Should(gomega.HaveKey("node2")) + gomega.Expect(p.active).Should(gomega.HaveKey("node1")) }() }) @@ -125,9 +125,8 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { p.OnAddOrUpdate(node2) bp := p.NewBatchPublisher(3 * time.Second) - t := bus.UniTopic("test") for i := 0; i < 10; i++ { - _, err := bp.Publish(t, + _, err := bp.Publish(data.TopicStreamWrite, bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), ) @@ -137,8 +136,98 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { gomega.Consistently(func() int { p.mu.RLock() defer p.mu.RUnlock() - return len(p.clients) + return len(p.active) }, "1s").Should(gomega.Equal(2)) }) }) + + ginkgo.Context("Broadcast", func() { + ginkgo.It("should broadcast messages", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(ff).Should(gomega.HaveLen(2)) + messages, err := ff[0].GetAll() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(messages).Should(gomega.HaveLen(1)) + }) + + ginkgo.It("should broadcast messages to failed nodes", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.Unavailable, 0) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(ff).Should(gomega.HaveLen(2)) + for i := range ff { + _, err := ff[i].GetAll() + ginkgo.GinkgoWriter.Printf("error: %v \n", err) + if err != nil { + s, ok := status.FromError(err) + gomega.Expect(ok).Should(gomega.BeTrue()) + gomega.Expect(s.Code()).Should(gomega.Equal(codes.Unavailable)) + return + } + } + ginkgo.Fail("should not reach here") + }) + + ginkgo.It("should broadcast messages to slow nodes", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.OK, 5*time.Second) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(ff).Should(gomega.HaveLen(2)) + for i := range ff { + _, err := ff[i].GetAll() + ginkgo.GinkgoWriter.Printf("error: %v \n", err) + if err != nil { + s, ok := status.FromError(err) + gomega.Expect(ok).Should(gomega.BeTrue()) + gomega.Expect(s.Code()).Should(gomega.Equal(codes.DeadlineExceeded)) + return + } + } + ginkgo.Fail("should not reach here") + }) + }) }) diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md index 624bc828..3f49cda1 100644 --- a/docs/concept/clustering.md +++ b/docs/concept/clustering.md @@ -45,6 +45,8 @@ All nodes within a BanyanDB cluster communicate with other nodes according to th All nodes in the cluster are discovered by the Meta Nodes. When a node starts up, it registers itself with the Meta Nodes. The Meta Nodes then share this information with the Liaison Nodes which use it to route requests to the appropriate nodes. +If data nodes are unable to connect to the meta nodes due to network partition or other issues, they will be removed from the meta nodes. However, the liaison nodes will not remove the data nodes from their routing list until the data nodes are also unreachable from the liaison nodes' perspective. This approach ensures that the system can continue to function even if some data nodes are temporarily unavailable from the meta nodes. + ## 3. **Data Organization** Different nodes in BanyanDB are responsible for different parts of the database, while Query and Liaison Nodes manage the routing and processing of queries. @@ -177,3 +179,15 @@ User 4. The results from each shard are then returned to the Liaison Node, which consolidates them into a single response to the user. This architecture allows BanyanDB to execute queries efficiently across a distributed system, leveraging the distributed query capabilities of the Liaison Node and the parallel processing of Data Nodes. + +## 7. Failover + +BanyanDB is designed to be highly available and fault-tolerant. + +In case of a Data Node failure, the system can automatically recover and continue to operate. + +Liaison nodes have a built-in mechanism to detect the failure of a Data Node. When a Data Node fails, the Liaison Node will automatically route requests to other available Data Nodes with the same shard. This ensures that the system remains operational even in the face of node failures. Thanks to the query mode, which allows Liaison Nodes to access all Data Nodes, the system can continue to function even if some Data Nodes are unavailable. When the failed data nodes are restored, the sys [...] + +In the case of a Liaison Node failure, the system can be configured to have multiple Liaison Nodes for redundancy. If one Liaison Node fails, the other Liaison Nodes can take over its responsibilities, ensuring that the system remains available. + +> Please note that any written request which triggers the failover process will be rejected, and the client should re-send the request. diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 26c5be17..425bc60f 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -23,6 +23,7 @@ import ( "errors" "io" "sync" + "time" "go.uber.org/multierr" @@ -102,7 +103,7 @@ type Publisher interface { // Broadcaster allow sending Messages to a Topic and receiving the responses. type Broadcaster interface { - Broadcast(topic Topic, message Message) ([]Future, error) + Broadcast(timeout time.Duration, topic Topic, message Message) ([]Future, error) } type channel chan event diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go index 8f35f4b4..d0aee004 100644 --- a/pkg/query/logical/measure/measure_plan_distributed.go +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -20,6 +20,7 @@ package measure import ( "context" "fmt" + "time" "go.uber.org/multierr" "google.golang.org/protobuf/proto" @@ -35,6 +36,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/logical" ) +const defaultQueryTimeout = 10 * time.Second + var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) type unresolvedDistributed struct { @@ -138,11 +141,11 @@ func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, erro if t.maxDataPointsSize > 0 { query.Limit = t.maxDataPointsSize } - ff, err := dctx.Broadcast(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) + var allErr error + ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) if err != nil { return nil, err } - var allErr error var see []sort.Iterator[*comparableDataPoint] for _, f := range ff { if m, getErr := f.Get(); getErr != nil { diff --git a/pkg/query/logical/stream/stream_plan_distributed.go b/pkg/query/logical/stream/stream_plan_distributed.go index 6f524737..8c3ed930 100644 --- a/pkg/query/logical/stream/stream_plan_distributed.go +++ b/pkg/query/logical/stream/stream_plan_distributed.go @@ -20,6 +20,7 @@ package stream import ( "context" "fmt" + "time" "go.uber.org/multierr" "google.golang.org/protobuf/proto" @@ -35,6 +36,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/logical" ) +const defaultQueryTimeout = 10 * time.Second + var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) type unresolvedDistributed struct { @@ -128,7 +131,7 @@ func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, err if t.maxElementSize > 0 { query.Limit = t.maxElementSize } - ff, err := dctx.Broadcast(data.TopicStreamQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) + ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) if err != nil { return nil, err } @@ -152,7 +155,7 @@ func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, err for iter.Next() { result = append(result, iter.Val().Element) } - return result, nil + return result, allErr } func (t *distributedPlan) String() string { diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go index e0398efc..90e991b9 100644 --- a/test/cases/measure/measure.go +++ b/test/cases/measure/measure.go @@ -60,7 +60,7 @@ var _ = g.DescribeTable("Scanning Measures", verify, g.Entry("filter by several entity ids", helpers.Args{Input: "entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("filter by entity id and service id", helpers.Args{Input: "entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), - g.Entry("invalid logical expression", helpers.Args{Input: "err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}), + g.FEntry("invalid logical expression", helpers.Args{Input: "err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantErr: true}), g.Entry("linked or expressions", helpers.Args{Input: "linked_or", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("In and not In expressions", helpers.Args{Input: "in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}), g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
