This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch publish
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e438ad75162f954c37db504e5836f94fe2f9b916
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Apr 16 02:42:25 2024 +0000

    Update doc
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/dquery/topn.go                             |  10 +-
 banyand/queue/local.go                             |   2 +-
 banyand/queue/pub/client.go                        |  65 ++++++++----
 banyand/queue/pub/client_test.go                   |  21 +++-
 banyand/queue/pub/pub.go                           |  76 ++++++++-----
 banyand/queue/pub/pub_suite_test.go                |  21 +++-
 banyand/queue/pub/pub_test.go                      | 117 ++++++++++++++++++---
 docs/concept/clustering.md                         |  14 +++
 pkg/bus/bus.go                                     |   3 +-
 .../logical/measure/measure_plan_distributed.go    |   7 +-
 .../logical/stream/stream_plan_distributed.go      |   7 +-
 test/cases/measure/measure.go                      |   2 +-
 12 files changed, 273 insertions(+), 72 deletions(-)

diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index f5d7cf1f..ad666a45 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -18,6 +18,8 @@
 package dquery
 
 import (
+       "time"
+
        "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -31,6 +33,8 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
+const defaultTopNQueryTimeout = 10 * time.Second
+
 type topNQueryProcessor struct {
        broadcaster bus.Broadcaster
        *queryService
@@ -52,7 +56,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
        agg := request.Agg
        request.Agg = 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
        now := bus.MessageID(request.TimeRange.Begin.Nanos)
-       ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery, 
bus.NewMessage(now, request))
+       ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, 
data.TopicTopNQuery, bus.NewMessage(now, request))
        if err != nil {
                resp = bus.NewMessage(now, common.NewError("execute the query 
%s: %v", request.Metadata.GetName(), err))
                return
@@ -87,6 +91,10 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                        }
                }
        }
+       if allErr != nil {
+               resp = bus.NewMessage(now, common.NewError("execute the query 
%s: %v", request.Metadata.GetName(), allErr))
+               return
+       }
        if tags == nil {
                resp = bus.NewMessage(now, &measurev1.TopNResponse{})
                return
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 58c67781..75cfb47d 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -65,7 +65,7 @@ func (l *local) Publish(topic bus.Topic, message 
...bus.Message) (bus.Future, er
        return l.local.Publish(topic, message...)
 }
 
-func (l *local) Broadcast(topic bus.Topic, message bus.Message) ([]bus.Future, 
error) {
+func (l *local) Broadcast(_ time.Duration, topic bus.Topic, message 
bus.Message) ([]bus.Future, error) {
        f, err := l.Publish(topic, message)
        if err != nil {
                return nil, err
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 7bdc77da..b82dbc67 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -97,11 +97,13 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
        p.mu.Lock()
        defer p.mu.Unlock()
 
+       p.registered[name] = struct{}{}
+
        // If the client already exists, just return
-       if _, ok := p.clients[name]; ok {
+       if _, ok := p.active[name]; ok {
                return
        }
-       if _, ok := p.evictClients[name]; ok {
+       if _, ok := p.evictable[name]; ok {
                return
        }
        conn, err := grpc.Dial(address, 
grpc.WithTransportCredentials(insecure.NewCredentials()), 
grpc.WithDefaultServiceConfig(retryPolicy))
@@ -111,15 +113,16 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
        }
 
        if !p.checkClient(conn, md) {
+               p.log.Info().Str("status", p.dump()).Stringer("node", 
node).Msg("node is unhealthy, move it to evict queue")
                return
        }
 
        c := clusterv1.NewServiceClient(conn)
-       p.clients[name] = &client{conn: conn, client: c, md: md}
+       p.active[name] = &client{conn: conn, client: c, md: md}
        if p.handler != nil {
                p.handler.OnAddOrUpdate(md)
        }
-       p.log.Info().Stringer("node", node).Msg("new node is healthy, add it to 
active queue")
+       p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("new 
node is healthy, add it to active queue")
 }
 
 func (p *pub) OnDelete(md schema.Metadata) {
@@ -138,20 +141,21 @@ func (p *pub) OnDelete(md schema.Metadata) {
        }
        p.mu.Lock()
        defer p.mu.Unlock()
-       if en, ok := p.evictClients[name]; ok {
+       delete(p.registered, name)
+       if en, ok := p.evictable[name]; ok {
                close(en.c)
-               delete(p.evictClients, name)
-               p.log.Info().Stringer("node", node).Msg("node is removed from 
evict queue by delete event")
+               delete(p.evictable, name)
+               p.log.Info().Str("status", p.dump()).Stringer("node", 
node).Msg("node is removed from evict queue by delete event")
                return
        }
 
-       if client, ok := p.clients[name]; ok && !p.healthCheck(node, 
client.conn) {
+       if client, ok := p.active[name]; ok && !p.healthCheck(node, 
client.conn) {
                _ = client.conn.Close()
-               delete(p.clients, name)
+               delete(p.active, name)
                if p.handler != nil {
                        p.handler.OnDelete(md)
                }
-               p.log.Info().Stringer("node", node).Msg("node is removed from 
active queue by delete event")
+               p.log.Info().Str("status", p.dump()).Stringer("node", 
node).Msg("node is removed from active queue by delete event")
        }
 }
 
@@ -168,9 +172,8 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md 
schema.Metadata) bool {
        if !p.closer.AddRunning() {
                return false
        }
-       p.log.Info().Stringer("node", node).Msg("node is unhealthy, move it to 
evict queue")
        name := node.Metadata.Name
-       p.evictClients[name] = evictNode{n: node, c: make(chan struct{})}
+       p.evictable[name] = evictNode{n: node, c: make(chan struct{})}
        if p.handler != nil {
                p.handler.OnDelete(md)
        }
@@ -184,22 +187,25 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md 
schema.Metadata) bool {
                                if errEvict == nil && p.healthCheck(en.n, 
connEvict) {
                                        p.mu.Lock()
                                        defer p.mu.Unlock()
-                                       if _, ok := p.evictClients[name]; !ok {
+                                       if _, ok := p.evictable[name]; !ok {
                                                // The client has been removed 
from evict clients map, just return
                                                return
                                        }
                                        c := 
clusterv1.NewServiceClient(connEvict)
-                                       p.clients[name] = &client{conn: 
connEvict, client: c, md: md}
+                                       p.active[name] = &client{conn: 
connEvict, client: c, md: md}
                                        if p.handler != nil {
                                                p.handler.OnAddOrUpdate(md)
                                        }
-                                       delete(p.evictClients, name)
+                                       delete(p.evictable, name)
                                        p.log.Info().Stringer("node", 
en.n).Msg("node is healthy, move it back to active queue")
                                        return
                                }
                                if errEvict != nil {
                                        _ = connEvict.Close()
                                }
+                               if _, ok := p.registered[name]; !ok {
+                                       return
+                               }
                                p.log.Error().Err(errEvict).Msgf("failed to 
re-connect to grpc server after waiting for %s", backoff)
                        case <-en.c:
                                return
@@ -210,7 +216,7 @@ func (p *pub) checkClient(conn *grpc.ClientConn, md 
schema.Metadata) bool {
                                backoff = maxBackoff
                        }
                }
-       }(p, name, p.evictClients[name], md)
+       }(p, name, p.evictable[name], md)
        return false
 }
 
@@ -237,20 +243,37 @@ func (p *pub) healthCheck(node fmt.Stringer, conn 
*grpc.ClientConn) bool {
 func (p *pub) failover(node string) {
        p.mu.Lock()
        defer p.mu.Unlock()
-       if en, ok := p.evictClients[node]; ok {
+       if en, ok := p.evictable[node]; ok {
                close(en.c)
-               delete(p.evictClients, node)
-               p.log.Info().Str("node", node).Msg("node is removed from evict 
queue by wire event")
+               delete(p.evictable, node)
+               p.log.Info().Str("node", node).Str("status", 
p.dump()).Msg("node is removed from evict queue by wire event")
                return
        }
 
-       if client, ok := p.clients[node]; ok && !p.checkClient(client.conn, 
client.md) {
+       if client, ok := p.active[node]; ok && !p.checkClient(client.conn, 
client.md) {
                _ = client.conn.Close()
-               delete(p.clients, node)
+               delete(p.active, node)
                if p.handler != nil {
                        p.handler.OnDelete(client.md)
                }
+               p.log.Info().Str("status", p.dump()).Str("node", 
node).Msg("node is unhealthy, move it to evict queue")
+       }
+}
+
+func (p *pub) dump() string {
+       keysRegistered := make([]string, 0, len(p.registered))
+       for k := range p.registered {
+               keysRegistered = append(keysRegistered, k)
+       }
+       keysActive := make([]string, 0, len(p.active))
+       for k := range p.active {
+               keysActive = append(keysActive, k)
+       }
+       keysEvictable := make([]string, 0, len(p.evictable))
+       for k := range p.evictable {
+               keysEvictable = append(keysEvictable, k)
        }
+       return fmt.Sprintf("registered: %v, active :%v, evictable :%v", 
keysRegistered, keysActive, keysEvictable)
 }
 
 type evictNode struct {
diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go
index 83200aa9..2ced4841 100644
--- a/banyand/queue/pub/client_test.go
+++ b/banyand/queue/pub/client_test.go
@@ -73,17 +73,32 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                gomega.Eventually(func() int {
                        p.mu.RLock()
                        defer p.mu.RUnlock()
-                       return len(p.clients)
+                       return len(p.active)
                }, flags.EventuallyTimeout).Should(gomega.Equal(1))
                verifyClients(p, 1, 0, 1, 1)
        })
+
+       ginkgo.It("should be removed", func() {
+               addr1 := getAddress()
+               node1 := getDataNode("node1", addr1)
+               p := newPub()
+               defer p.GracefulStop()
+               closeFn := setup(addr1, codes.OK, 200*time.Millisecond)
+               p.OnAddOrUpdate(node1)
+               verifyClients(p, 1, 0, 1, 0)
+               closeFn()
+               p.failover("node1")
+               verifyClients(p, 0, 1, 1, 2)
+               p.OnDelete(node1)
+               verifyClients(p, 0, 0, 1, 2)
+       })
 })
 
 func verifyClients(p *pub, active, evict, onAdd, onDelete int) {
        p.mu.RLock()
        defer p.mu.RUnlock()
-       gomega.Expect(p.clients).Should(gomega.HaveLen(active))
-       gomega.Expect(p.evictClients).Should(gomega.HaveLen(evict))
+       gomega.Expect(p.active).Should(gomega.HaveLen(active))
+       gomega.Expect(p.evictable).Should(gomega.HaveLen(evict))
        h := p.handler.(*mockHandler)
        gomega.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd))
        gomega.Expect(h.deleteCount).Should(gomega.Equal(onDelete))
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 200b2b9d..dc650cfa 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -27,6 +27,8 @@ import (
 
        "github.com/pkg/errors"
        "go.uber.org/multierr"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
        "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/anypb"
 
@@ -47,13 +49,14 @@ var (
 
 type pub struct {
        schema.UnimplementedOnInitHandler
-       metadata     metadata.Repo
-       handler      schema.EventHandler
-       log          *logger.Logger
-       clients      map[string]*client
-       evictClients map[string]evictNode
-       closer       *run.Closer
-       mu           sync.RWMutex
+       metadata   metadata.Repo
+       handler    schema.EventHandler
+       log        *logger.Logger
+       registered map[string]struct{}
+       active     map[string]*client
+       evictable  map[string]evictNode
+       closer     *run.Closer
+       mu         sync.RWMutex
 }
 
 func (p *pub) Register(handler schema.EventHandler) {
@@ -63,16 +66,16 @@ func (p *pub) Register(handler schema.EventHandler) {
 func (p *pub) GracefulStop() {
        p.mu.Lock()
        defer p.mu.Unlock()
-       for i := range p.evictClients {
-               close(p.evictClients[i].c)
+       for i := range p.evictable {
+               close(p.evictable[i].c)
        }
-       p.evictClients = nil
+       p.evictable = nil
        p.closer.Done()
        p.closer.CloseThenWait()
-       for _, c := range p.clients {
+       for _, c := range p.active {
                _ = c.conn.Close()
        }
-       p.clients = nil
+       p.active = nil
 }
 
 // Serve implements run.Service.
@@ -80,10 +83,10 @@ func (p *pub) Serve() run.StopNotify {
        return p.closer.CloseNotify()
 }
 
-func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, 
error) {
+func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages 
bus.Message) ([]bus.Future, error) {
        var names []string
        p.mu.RLock()
-       for k := range p.clients {
+       for k := range p.active {
                names = append(names, k)
        }
        p.mu.RUnlock()
@@ -93,7 +96,7 @@ func (p *pub) Broadcast(topic bus.Topic, messages 
bus.Message) ([]bus.Future, er
                wg.Add(1)
                go func(n string) {
                        defer wg.Done()
-                       f, err := p.Publish(topic, 
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
+                       f, err := p.publish(timeout, topic, 
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
                        futureCh <- publishResult{n: n, f: f, e: err}
                }(n)
        }
@@ -106,6 +109,14 @@ func (p *pub) Broadcast(topic bus.Topic, messages 
bus.Message) ([]bus.Future, er
        for f := range futureCh {
                if f.e != nil {
                        errs = multierr.Append(errs, errors.Wrapf(f.e, "failed 
to publish message to %s", f.n))
+                       if isFailoverError(f.e) {
+                               if p.closer.AddRunning() {
+                                       go func() {
+                                               defer p.closer.Done()
+                                               p.failover(f.n)
+                                       }()
+                               }
+                       }
                        continue
                }
                futures = append(futures, f.f)
@@ -123,7 +134,7 @@ type publishResult struct {
        n string
 }
 
-func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, 
error) {
+func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages 
...bus.Message) (bus.Future, error) {
        var err error
        f := &future{}
        handleMessage := func(m bus.Message, err error) error {
@@ -133,12 +144,12 @@ func (p *pub) Publish(topic bus.Topic, messages 
...bus.Message) (bus.Future, err
                }
                node := m.Node()
                p.mu.RLock()
-               client, ok := p.clients[node]
+               client, ok := p.active[node]
                p.mu.RUnlock()
                if !ok {
                        return multierr.Append(err, fmt.Errorf("failed to get 
client for node %s", node))
                }
-               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               ctx, cancel := context.WithTimeout(context.Background(), 
timeout)
                f.cancelFn = append(f.cancelFn, cancel)
                stream, errCreateStream := client.client.Send(ctx)
                if errCreateStream != nil {
@@ -158,6 +169,10 @@ func (p *pub) Publish(topic bus.Topic, messages 
...bus.Message) (bus.Future, err
        return f, err
 }
 
+func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, 
error) {
+       return p.publish(5*time.Second, topic, messages...)
+}
+
 // NewBatchPublisher returns a new batch publisher.
 func (p *pub) NewBatchPublisher(timeout time.Duration) queue.BatchPublisher {
        return &batchPublisher{
@@ -171,10 +186,11 @@ func (p *pub) NewBatchPublisher(timeout time.Duration) 
queue.BatchPublisher {
 // New returns a new queue client.
 func New(metadata metadata.Repo) queue.Client {
        return &pub{
-               metadata:     metadata,
-               clients:      make(map[string]*client),
-               evictClients: make(map[string]evictNode),
-               closer:       run.NewCloser(1),
+               metadata:   metadata,
+               active:     make(map[string]*client),
+               evictable:  make(map[string]evictNode),
+               registered: make(map[string]struct{}),
+               closer:     run.NewCloser(1),
        }
 }
 
@@ -253,7 +269,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages 
...bus.Message) (bus
                }
 
                bp.pub.mu.RLock()
-               client, ok := bp.pub.clients[node]
+               client, ok := bp.pub.active[node]
                bp.pub.mu.RUnlock()
                if !ok {
                        err = multierr.Append(err, fmt.Errorf("failed to get 
client for node %s", node))
@@ -393,10 +409,12 @@ func (b *batchFuture) get() []string {
                                        mux.Lock()
                                        defer mux.Unlock()
                                        // only log the error once for each node
-                                       if _, ok := b.errNodes[evt.n]; !ok {
+                                       if _, ok := b.errNodes[evt.n]; !ok && 
isFailoverError(evt.e) {
                                                
b.l.Error().Err(evt.e).Msgf("failed to send message to node %s", evt.n)
+                                               b.errNodes[evt.n] = struct{}{}
+                                               return
                                        }
-                                       b.errNodes[evt.n] = struct{}{}
+                                       b.l.Error().Err(evt.e).Msgf("failed to 
send message to node %s", evt.n)
                                }()
                        }
                }(e, &mux)
@@ -410,3 +428,11 @@ func (b *batchFuture) get() []string {
        }
        return result
 }
+
+func isFailoverError(err error) bool {
+       s, ok := status.FromError(err)
+       if !ok {
+               return false
+       }
+       return s.Code() == codes.Unavailable || s.Code() == 
codes.DeadlineExceeded
+}
diff --git a/banyand/queue/pub/pub_suite_test.go 
b/banyand/queue/pub/pub_suite_test.go
index 9a1c65aa..79080d9a 100644
--- a/banyand/queue/pub/pub_suite_test.go
+++ b/banyand/queue/pub/pub_suite_test.go
@@ -32,7 +32,9 @@ import (
        "google.golang.org/grpc/health"
        "google.golang.org/grpc/health/grpc_health_v1"
        "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/anypb"
 
+       "github.com/apache/skywalking-banyandb/api/data"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -80,10 +82,27 @@ func (s *mockServer) Send(stream 
clusterv1.Service_SendServer) error {
                        time.Sleep(s.latency)
                }
 
+               topic, ok := data.TopicMap[req.Topic]
+
+               if !ok {
+                       panic("invalid topic")
+               }
+               f := data.TopicResponseMap[topic]
+               var body *anypb.Any
+               if f == nil {
+                       body = req.Body
+               } else {
+                       var errAny error
+                       body, errAny = anypb.New(f())
+                       if errAny != nil {
+                               panic(errAny)
+                       }
+               }
+
                res := &clusterv1.SendResponse{
                        MessageId: req.MessageId,
                        Error:     "",
-                       Body:      req.Body,
+                       Body:      body,
                }
 
                if err := stream.Send(res); err != nil {
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
index bf20bfc0..3f4f39d5 100644
--- a/banyand/queue/pub/pub_test.go
+++ b/banyand/queue/pub/pub_test.go
@@ -24,13 +24,15 @@ import (
        "github.com/onsi/gomega"
        "github.com/onsi/gomega/gleak"
        "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
 
+       "github.com/apache/skywalking-banyandb/api/data"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
-var _ = ginkgo.Describe("publish clients register/unregister", func() {
+var _ = ginkgo.Describe("Publish and Broadcast", func() {
        var goods []gleak.Goroutine
        ginkgo.BeforeEach(func() {
                goods = gleak.Goroutines()
@@ -39,7 +41,7 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                gomega.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
        })
 
-       ginkgo.Context("publisher and batch publisher", func() {
+       ginkgo.Context("Publisher and batch publisher", func() {
                ginkgo.It("should publish messages", func() {
                        addr1 := getAddress()
                        addr2 := getAddress()
@@ -58,9 +60,8 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
 
                        bp := p.NewBatchPublisher(3 * time.Second)
                        defer bp.Close()
-                       t := bus.UniTopic("test")
                        for i := 0; i < 10; i++ {
-                               _, err := bp.Publish(t,
+                               _, err := bp.Publish(data.TopicStreamWrite,
                                        
bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", 
&streamv1.InternalWriteRequest{}),
                                        
bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", 
&streamv1.InternalWriteRequest{}),
                                )
@@ -84,10 +85,9 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       bp := p.NewBatchPublisher(3 * time.Second)
-                       t := bus.UniTopic("test")
+                       bp := p.NewBatchPublisher(30 * time.Second)
                        for i := 0; i < 10; i++ {
-                               _, err := bp.Publish(t,
+                               _, err := bp.Publish(data.TopicStreamWrite,
                                        
bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", 
&streamv1.InternalWriteRequest{}),
                                        
bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", 
&streamv1.InternalWriteRequest{}),
                                )
@@ -97,14 +97,14 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                        gomega.Eventually(func() int {
                                p.mu.RLock()
                                defer p.mu.RUnlock()
-                               return len(p.clients)
+                               return len(p.active)
                        }, flags.EventuallyTimeout).Should(gomega.Equal(1))
                        func() {
                                p.mu.RLock()
                                defer p.mu.RUnlock()
-                               
gomega.Expect(p.evictClients).Should(gomega.HaveLen(1))
-                               
gomega.Expect(p.evictClients).Should(gomega.HaveKey("node2"))
-                               
gomega.Expect(p.clients).Should(gomega.HaveKey("node1"))
+                               
gomega.Expect(p.evictable).Should(gomega.HaveLen(1))
+                               
gomega.Expect(p.evictable).Should(gomega.HaveKey("node2"))
+                               
gomega.Expect(p.active).Should(gomega.HaveKey("node1"))
                        }()
                })
 
@@ -125,9 +125,8 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                        p.OnAddOrUpdate(node2)
 
                        bp := p.NewBatchPublisher(3 * time.Second)
-                       t := bus.UniTopic("test")
                        for i := 0; i < 10; i++ {
-                               _, err := bp.Publish(t,
+                               _, err := bp.Publish(data.TopicStreamWrite,
                                        
bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", 
&streamv1.InternalWriteRequest{}),
                                        
bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", 
&streamv1.InternalWriteRequest{}),
                                )
@@ -137,8 +136,98 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                        gomega.Consistently(func() int {
                                p.mu.RLock()
                                defer p.mu.RUnlock()
-                               return len(p.clients)
+                               return len(p.active)
                        }, "1s").Should(gomega.Equal(2))
                })
        })
+
+       ginkgo.Context("Broadcast", func() {
+               ginkgo.It("should broadcast messages", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
+                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       gomega.Expect(ff).Should(gomega.HaveLen(2))
+                       messages, err := ff[0].GetAll()
+                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       gomega.Expect(messages).Should(gomega.HaveLen(1))
+               })
+
+               ginkgo.It("should broadcast messages to failed nodes", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.Unavailable, 0)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
+                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       gomega.Expect(ff).Should(gomega.HaveLen(2))
+                       for i := range ff {
+                               _, err := ff[i].GetAll()
+                               ginkgo.GinkgoWriter.Printf("error: %v \n", err)
+                               if err != nil {
+                                       s, ok := status.FromError(err)
+                                       
gomega.Expect(ok).Should(gomega.BeTrue())
+                                       
gomega.Expect(s.Code()).Should(gomega.Equal(codes.Unavailable))
+                                       return
+                               }
+                       }
+                       ginkgo.Fail("should not reach here")
+               })
+
+               ginkgo.It("should broadcast messages to slow nodes", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.OK, 5*time.Second)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
+                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       gomega.Expect(ff).Should(gomega.HaveLen(2))
+                       for i := range ff {
+                               _, err := ff[i].GetAll()
+                               ginkgo.GinkgoWriter.Printf("error: %v \n", err)
+                               if err != nil {
+                                       s, ok := status.FromError(err)
+                                       
gomega.Expect(ok).Should(gomega.BeTrue())
+                                       
gomega.Expect(s.Code()).Should(gomega.Equal(codes.DeadlineExceeded))
+                                       return
+                               }
+                       }
+                       ginkgo.Fail("should not reach here")
+               })
+       })
 })
diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md
index 624bc828..3f49cda1 100644
--- a/docs/concept/clustering.md
+++ b/docs/concept/clustering.md
@@ -45,6 +45,8 @@ All nodes within a BanyanDB cluster communicate with other 
nodes according to th
 
 All nodes in the cluster are discovered by the Meta Nodes. When a node starts 
up, it registers itself with the Meta Nodes. The Meta Nodes then share this 
information with the Liaison Nodes which use it to route requests to the 
appropriate nodes.
 
+If data nodes are unable to connect to the meta nodes due to network partition 
or other issues, they will be removed from the meta nodes. However, the liaison 
nodes will not remove the data nodes from their routing list until the data 
nodes are also unreachable from the liaison nodes' perspective. This approach 
ensures that the system can continue to function even if some data nodes are 
temporarily unavailable from the meta nodes.
+
 ## 3. **Data Organization**
 
 Different nodes in BanyanDB are responsible for different parts of the 
database, while Query and Liaison Nodes manage the routing and processing of 
queries.
@@ -177,3 +179,15 @@ User
 4. The results from each shard are then returned to the Liaison Node, which 
consolidates them into a single response to the user.
 
 This architecture allows BanyanDB to execute queries efficiently across a 
distributed system, leveraging the distributed query capabilities of the 
Liaison Node and the parallel processing of Data Nodes.
+
+## 7. Failover
+
+BanyanDB is designed to be highly available and fault-tolerant.
+
+In case of a Data Node failure, the system can automatically recover and 
continue to operate.
+
+Liaison nodes have a built-in mechanism to detect the failure of a Data Node. 
When a Data Node fails, the Liaison Node will automatically route requests to 
other available Data Nodes with the same shard. This ensures that the system 
remains operational even in the face of node failures. Thanks to the query 
mode, which allows Liaison Nodes to access all Data Nodes, the system can 
continue to function even if some Data Nodes are unavailable. When the failed 
data nodes are restored, the sys [...]
+
+In the case of a Liaison Node failure, the system can be configured to have 
multiple Liaison Nodes for redundancy. If one Liaison Node fails, the other 
Liaison Nodes can take over its responsibilities, ensuring that the system 
remains available.
+
+> Please note that any written request which triggers the failover process 
will be rejected, and the client should re-send the request.
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index 26c5be17..425bc60f 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -23,6 +23,7 @@ import (
        "errors"
        "io"
        "sync"
+       "time"
 
        "go.uber.org/multierr"
 
@@ -102,7 +103,7 @@ type Publisher interface {
 
 // Broadcaster allow sending Messages to a Topic and receiving the responses.
 type Broadcaster interface {
-       Broadcast(topic Topic, message Message) ([]Future, error)
+       Broadcast(timeout time.Duration, topic Topic, message Message) 
([]Future, error)
 }
 
 type channel chan event
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index 8f35f4b4..d0aee004 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -20,6 +20,7 @@ package measure
 import (
        "context"
        "fmt"
+       "time"
 
        "go.uber.org/multierr"
        "google.golang.org/protobuf/proto"
@@ -35,6 +36,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
+const defaultQueryTimeout = 10 * time.Second
+
 var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
 
 type unresolvedDistributed struct {
@@ -138,11 +141,11 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(executor.MIterator, erro
        if t.maxDataPointsSize > 0 {
                query.Limit = t.maxDataPointsSize
        }
-       ff, err := dctx.Broadcast(data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       var allErr error
+       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
        if err != nil {
                return nil, err
        }
-       var allErr error
        var see []sort.Iterator[*comparableDataPoint]
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index 6f524737..8c3ed930 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -20,6 +20,7 @@ package stream
 import (
        "context"
        "fmt"
+       "time"
 
        "go.uber.org/multierr"
        "google.golang.org/protobuf/proto"
@@ -35,6 +36,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
+const defaultQueryTimeout = 10 * time.Second
+
 var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
 
 type unresolvedDistributed struct {
@@ -128,7 +131,7 @@ func (t *distributedPlan) Execute(ctx context.Context) 
([]*streamv1.Element, err
        if t.maxElementSize > 0 {
                query.Limit = t.maxElementSize
        }
-       ff, err := dctx.Broadcast(data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
        if err != nil {
                return nil, err
        }
@@ -152,7 +155,7 @@ func (t *distributedPlan) Execute(ctx context.Context) 
([]*streamv1.Element, err
        for iter.Next() {
                result = append(result, iter.Val().Element)
        }
-       return result, nil
+       return result, allErr
 }
 
 func (t *distributedPlan) String() string {
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index e0398efc..90e991b9 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -60,7 +60,7 @@ var _ = g.DescribeTable("Scanning Measures", verify,
        g.Entry("filter by several entity ids", helpers.Args{Input: 
"entity_in", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("filter by entity id and service id", helpers.Args{Input: 
"entity_service", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),
-       g.Entry("invalid logical expression", helpers.Args{Input: 
"err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
WantErr: true}),
+       g.FEntry("invalid logical expression", helpers.Args{Input: 
"err_invalid_le", Duration: 25 * time.Minute, Offset: -20 * time.Minute, 
WantErr: true}),
        g.Entry("linked or expressions", helpers.Args{Input: "linked_or", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("In and not In expressions", helpers.Args{Input: "in", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
        g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 * 
time.Minute, Offset: -20 * time.Minute}),

Reply via email to