This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ecbd14f Add failover process between liaison and data nodes (#433)
6ecbd14f is described below

commit 6ecbd14f8fdfaeb08119113ee63bfee3a0a3f2b3
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Tue Apr 16 13:20:03 2024 +0800

    Add failover process between liaison and data nodes (#433)
---
 .github/workflows/ci.yml                           |   1 +
 banyand/dquery/topn.go                             |  10 +-
 banyand/internal/storage/index_test.go             |  15 +-
 banyand/liaison/grpc/measure.go                    |   3 +-
 banyand/liaison/grpc/server.go                     |   2 +
 banyand/liaison/grpc/stream.go                     |   3 +-
 banyand/measure/topn.go                            |   5 +-
 banyand/queue/local.go                             |   6 +-
 banyand/queue/pub/client.go                        | 167 +++++++++++++-
 banyand/queue/pub/client_test.go                   | 105 +++++++++
 banyand/queue/pub/pub.go                           | 162 +++++++++++---
 banyand/queue/pub/pub_suite_test.go                | 183 ++++++++++++++++
 banyand/queue/pub/pub_test.go                      | 239 +++++++++++++++++++++
 banyand/queue/queue.go                             |   3 +-
 docs/concept/clustering.md                         |  14 ++
 pkg/bus/bus.go                                     |   3 +-
 pkg/grpchelper/client.go                           |   6 +-
 .../logical/measure/measure_plan_distributed.go    |   7 +-
 .../logical/stream/stream_plan_distributed.go      |   7 +-
 test/stress/istio/istio_suite_test.go              |   6 +-
 20 files changed, 882 insertions(+), 65 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 8663ca24..5a7e943b 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -120,6 +120,7 @@ jobs:
     uses: ./.github/workflows/test.yml
     with:
       options: --fail-fast --label-filter \\!slow
+      timeout-minutes: 30
       
   result:
     name: Continuous Integration
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index f5d7cf1f..ad666a45 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -18,6 +18,8 @@
 package dquery
 
 import (
+       "time"
+
        "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -31,6 +33,8 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
+const defaultTopNQueryTimeout = 10 * time.Second
+
 type topNQueryProcessor struct {
        broadcaster bus.Broadcaster
        *queryService
@@ -52,7 +56,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
        agg := request.Agg
        request.Agg = 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
        now := bus.MessageID(request.TimeRange.Begin.Nanos)
-       ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery, 
bus.NewMessage(now, request))
+       ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, 
data.TopicTopNQuery, bus.NewMessage(now, request))
        if err != nil {
                resp = bus.NewMessage(now, common.NewError("execute the query 
%s: %v", request.Metadata.GetName(), err))
                return
@@ -87,6 +91,10 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp 
bus.Message) {
                        }
                }
        }
+       if allErr != nil {
+               resp = bus.NewMessage(now, common.NewError("execute the query 
%s: %v", request.Metadata.GetName(), allErr))
+               return
+       }
        if tags == nil {
                resp = bus.NewMessage(now, &measurev1.TopNResponse{})
                return
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index 253a64d3..8e59b4e1 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -209,16 +209,23 @@ func TestSeriesIndexController(t *testing.T) {
                require.NoError(t, err)
                defer sic.Close()
                require.NoError(t, 
sic.run(time.Now().Add(-time.Hour*23+10*time.Minute)))
-               require.NotNil(t, sic.standby)
+               sic.RLock()
+               standby := sic.standby
+               sic.RUnlock()
+               require.NotNil(t, standby)
                idxNames := make([]string, 0)
                walkDir(tmpDir, "idx-", func(suffix string) error {
                        idxNames = append(idxNames, suffix)
                        return nil
                })
                assert.Equal(t, 2, len(idxNames))
-               nextTime := sic.standby.startTime
+               nextTime := standby.startTime
                require.NoError(t, sic.run(time.Now().Add(time.Hour)))
-               require.Nil(t, sic.standby)
-               assert.Equal(t, nextTime, sic.hot.startTime)
+               sic.RLock()
+               standby = sic.standby
+               hot := sic.hot
+               sic.RUnlock()
+               require.Nil(t, standby)
+               assert.Equal(t, nextTime, hot.startTime)
        })
 }
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 930cec03..d149aef5 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -46,6 +46,7 @@ type measureService struct {
        ingestionAccessLog accesslog.Log
        pipeline           queue.Client
        broadcaster        queue.Client
+       writeTimeout       time.Duration
 }
 
 func (ms *measureService) setLogger(log *logger.Logger) {
@@ -67,7 +68,7 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                }
        }
        ctx := measure.Context()
-       publisher := ms.pipeline.NewBatchPublisher()
+       publisher := ms.pipeline.NewBatchPublisher(ms.writeTimeout)
        defer publisher.Close()
        for {
                select {
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 102bd170..a4d48bb5 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -178,6 +178,8 @@ func (s *server) FlagSet() *run.FlagSet {
        fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens")
        fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", 
false, "enable ingestion access log")
        fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access 
log root path")
+       fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 
15*time.Second, "stream write timeout")
+       fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 
15*time.Second, "measure write timeout")
        return fs
 }
 
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 3e530ff6..37635a4b 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -46,6 +46,7 @@ type streamService struct {
        ingestionAccessLog accesslog.Log
        pipeline           queue.Client
        broadcaster        queue.Client
+       writeTimeout       time.Duration
 }
 
 func (s *streamService) setLogger(log *logger.Logger) {
@@ -66,7 +67,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        logger.Err(errResp).Msg("failed to send response")
                }
        }
-       publisher := s.pipeline.NewBatchPublisher()
+       publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
        defer publisher.Close()
        ctx := stream.Context()
        for {
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 90b2d473..8f657175 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -50,7 +50,8 @@ import (
 )
 
 const (
-       timeBucketFormat = "200601021504"
+       timeBucketFormat         = "200601021504"
+       resultPersistencyTimeout = 10 * time.Second
 )
 
 var (
@@ -133,7 +134,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record 
flow.StreamRecord) err
        eventTime := t.downSampleTimeBucket(record.TimestampMillis())
        timeBucket := eventTime.Format(timeBucketFormat)
        var err error
-       publisher := t.pipeline.NewBatchPublisher()
+       publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout)
        defer publisher.Close()
        for group, tuples := range tuplesGroups {
                if e := t.l.Debug(); e.Enabled() {
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 9609ac24..75cfb47d 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -19,6 +19,8 @@
 package queue
 
 import (
+       "time"
+
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -63,7 +65,7 @@ func (l *local) Publish(topic bus.Topic, message 
...bus.Message) (bus.Future, er
        return l.local.Publish(topic, message...)
 }
 
-func (l *local) Broadcast(topic bus.Topic, message bus.Message) ([]bus.Future, 
error) {
+func (l *local) Broadcast(_ time.Duration, topic bus.Topic, message 
bus.Message) ([]bus.Future, error) {
        f, err := l.Publish(topic, message)
        if err != nil {
                return nil, err
@@ -75,7 +77,7 @@ func (l local) Name() string {
        return "local-pipeline"
 }
 
-func (l local) NewBatchPublisher() BatchPublisher {
+func (l local) NewBatchPublisher(_ time.Duration) BatchPublisher {
        return &localBatchPublisher{
                local: l.local,
        }
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 72014607..b82dbc67 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -18,17 +18,29 @@
 package pub
 
 import (
+       "context"
        "fmt"
+       "time"
 
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/grpc/health/grpc_health_v1"
 
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+const rpcTimeout = 2 * time.Second
+
 var (
+       // Retry policy for health check.
+       initBackoff       = time.Second
+       maxBackoff        = 20 * time.Second
+       backoffMultiplier = 2.0
+
        serviceName = clusterv1.Service_ServiceDesc.ServiceName
 
        // The timeout is set by each RPC.
@@ -49,6 +61,7 @@ var (
 type client struct {
        client clusterv1.ServiceClient
        conn   *grpc.ClientConn
+       md     schema.Metadata
 }
 
 func (p *pub) OnAddOrUpdate(md schema.Metadata) {
@@ -84,8 +97,13 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
        p.mu.Lock()
        defer p.mu.Unlock()
 
+       p.registered[name] = struct{}{}
+
        // If the client already exists, just return
-       if _, ok := p.clients[name]; ok {
+       if _, ok := p.active[name]; ok {
+               return
+       }
+       if _, ok := p.evictable[name]; ok {
                return
        }
        conn, err := grpc.Dial(address, 
grpc.WithTransportCredentials(insecure.NewCredentials()), 
grpc.WithDefaultServiceConfig(retryPolicy))
@@ -93,11 +111,18 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
                p.log.Error().Err(err).Msg("failed to connect to grpc server")
                return
        }
+
+       if !p.checkClient(conn, md) {
+               p.log.Info().Str("status", p.dump()).Stringer("node", 
node).Msg("node is unhealthy, move it to evict queue")
+               return
+       }
+
        c := clusterv1.NewServiceClient(conn)
-       p.clients[name] = &client{conn: conn, client: c}
+       p.active[name] = &client{conn: conn, client: c, md: md}
        if p.handler != nil {
                p.handler.OnAddOrUpdate(md)
        }
+       p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("new 
node is healthy, add it to active queue")
 }
 
 func (p *pub) OnDelete(md schema.Metadata) {
@@ -116,14 +141,142 @@ func (p *pub) OnDelete(md schema.Metadata) {
        }
        p.mu.Lock()
        defer p.mu.Unlock()
+       delete(p.registered, name)
+       if en, ok := p.evictable[name]; ok {
+               close(en.c)
+               delete(p.evictable, name)
+               p.log.Info().Str("status", p.dump()).Stringer("node", 
node).Msg("node is removed from evict queue by delete event")
+               return
+       }
 
-       if client, ok := p.clients[name]; ok {
-               if client.conn != nil {
-                       client.conn.Close() // Close the client connection
-               }
-               delete(p.clients, name)
+       if client, ok := p.active[name]; ok && !p.healthCheck(node, 
client.conn) {
+               _ = client.conn.Close()
+               delete(p.active, name)
                if p.handler != nil {
                        p.handler.OnDelete(md)
                }
+               p.log.Info().Str("status", p.dump()).Stringer("node", 
node).Msg("node is removed from active queue by delete event")
+       }
+}
+
+func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool {
+       node, ok := md.Spec.(*databasev1.Node)
+       if !ok {
+               logger.Panicf("failed to cast node spec")
+               return false
+       }
+       if p.healthCheck(node, conn) {
+               return true
+       }
+       _ = conn.Close()
+       if !p.closer.AddRunning() {
+               return false
+       }
+       name := node.Metadata.Name
+       p.evictable[name] = evictNode{n: node, c: make(chan struct{})}
+       if p.handler != nil {
+               p.handler.OnDelete(md)
+       }
+       go func(p *pub, name string, en evictNode, md schema.Metadata) {
+               defer p.closer.Done()
+               backoff := initBackoff
+               for {
+                       select {
+                       case <-time.After(backoff):
+                               connEvict, errEvict := 
grpc.Dial(node.GrpcAddress, 
grpc.WithTransportCredentials(insecure.NewCredentials()), 
grpc.WithDefaultServiceConfig(retryPolicy))
+                               if errEvict == nil && p.healthCheck(en.n, 
connEvict) {
+                                       p.mu.Lock()
+                                       defer p.mu.Unlock()
+                                       if _, ok := p.evictable[name]; !ok {
+                                               // The client has been removed 
from evict clients map, just return
+                                               return
+                                       }
+                                       c := 
clusterv1.NewServiceClient(connEvict)
+                                       p.active[name] = &client{conn: 
connEvict, client: c, md: md}
+                                       if p.handler != nil {
+                                               p.handler.OnAddOrUpdate(md)
+                                       }
+                                       delete(p.evictable, name)
+                                       p.log.Info().Stringer("node", 
en.n).Msg("node is healthy, move it back to active queue")
+                                       return
+                               }
+                               if errEvict != nil {
+                                       _ = connEvict.Close()
+                               }
+                               if _, ok := p.registered[name]; !ok {
+                                       return
+                               }
+                               p.log.Error().Err(errEvict).Msgf("failed to 
re-connect to grpc server after waiting for %s", backoff)
+                       case <-en.c:
+                               return
+                       }
+                       if backoff < maxBackoff {
+                               backoff *= time.Duration(backoffMultiplier)
+                       } else {
+                               backoff = maxBackoff
+                       }
+               }
+       }(p, name, p.evictable[name], md)
+       return false
+}
+
+func (p *pub) healthCheck(node fmt.Stringer, conn *grpc.ClientConn) bool {
+       var resp *grpc_health_v1.HealthCheckResponse
+       if err := grpchelper.Request(context.Background(), rpcTimeout, 
func(rpcCtx context.Context) (err error) {
+               resp, err = grpc_health_v1.NewHealthClient(conn).Check(rpcCtx,
+                       &grpc_health_v1.HealthCheckRequest{
+                               Service: "",
+                       })
+               return err
+       }); err != nil {
+               if e := p.log.Debug(); e.Enabled() {
+                       e.Err(err).Stringer("node", node).Msg("service 
unhealthy")
+               }
+               return false
+       }
+       if resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING {
+               return true
+       }
+       return false
+}
+
+func (p *pub) failover(node string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       if en, ok := p.evictable[node]; ok {
+               close(en.c)
+               delete(p.evictable, node)
+               p.log.Info().Str("node", node).Str("status", 
p.dump()).Msg("node is removed from evict queue by wire event")
+               return
+       }
+
+       if client, ok := p.active[node]; ok && !p.checkClient(client.conn, 
client.md) {
+               _ = client.conn.Close()
+               delete(p.active, node)
+               if p.handler != nil {
+                       p.handler.OnDelete(client.md)
+               }
+               p.log.Info().Str("status", p.dump()).Str("node", 
node).Msg("node is unhealthy, move it to evict queue")
        }
 }
+
+func (p *pub) dump() string {
+       keysRegistered := make([]string, 0, len(p.registered))
+       for k := range p.registered {
+               keysRegistered = append(keysRegistered, k)
+       }
+       keysActive := make([]string, 0, len(p.active))
+       for k := range p.active {
+               keysActive = append(keysActive, k)
+       }
+       keysEvictable := make([]string, 0, len(p.evictable))
+       for k := range p.evictable {
+               keysEvictable = append(keysEvictable, k)
+       }
+       return fmt.Sprintf("registered: %v, active :%v, evictable :%v", 
keysRegistered, keysActive, keysEvictable)
+}
+
+type evictNode struct {
+       n *databasev1.Node
+       c chan struct{}
+}
diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go
new file mode 100644
index 00000000..2ced4841
--- /dev/null
+++ b/banyand/queue/pub/client_test.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc/codes"
+
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+var _ = ginkgo.Describe("publish clients register/unregister", func() {
+       var goods []gleak.Goroutine
+       ginkgo.BeforeEach(func() {
+               goods = gleak.Goroutines()
+       })
+       ginkgo.AfterEach(func() {
+               gomega.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       })
+       ginkgo.It("should register and unregister clients", func() {
+               addr1 := getAddress()
+               closeFn := setup(addr1, codes.OK, 200*time.Millisecond)
+               p := newPub()
+               defer func() {
+                       p.GracefulStop()
+                       closeFn()
+               }()
+               node1 := getDataNode("node1", addr1)
+               p.OnAddOrUpdate(node1)
+               verifyClients(p, 1, 0, 1, 0)
+               addr2 := getAddress()
+               node2 := getDataNode("node2", addr2)
+               p.OnAddOrUpdate(node2)
+               verifyClients(p, 1, 1, 1, 1)
+
+               p.OnDelete(node1)
+               verifyClients(p, 1, 1, 1, 1)
+               p.OnDelete(node2)
+               verifyClients(p, 1, 0, 1, 1)
+               closeFn()
+               p.OnDelete(node1)
+               verifyClients(p, 0, 0, 1, 2)
+       })
+
+       ginkgo.It("should move back to active queue", func() {
+               addr1 := getAddress()
+               node1 := getDataNode("node1", addr1)
+               p := newPub()
+               defer p.GracefulStop()
+               p.OnAddOrUpdate(node1)
+               verifyClients(p, 0, 1, 0, 1)
+               closeFn := setup(addr1, codes.OK, 200*time.Millisecond)
+               defer closeFn()
+               gomega.Eventually(func() int {
+                       p.mu.RLock()
+                       defer p.mu.RUnlock()
+                       return len(p.active)
+               }, flags.EventuallyTimeout).Should(gomega.Equal(1))
+               verifyClients(p, 1, 0, 1, 1)
+       })
+
+       ginkgo.It("should be removed", func() {
+               addr1 := getAddress()
+               node1 := getDataNode("node1", addr1)
+               p := newPub()
+               defer p.GracefulStop()
+               closeFn := setup(addr1, codes.OK, 200*time.Millisecond)
+               p.OnAddOrUpdate(node1)
+               verifyClients(p, 1, 0, 1, 0)
+               closeFn()
+               p.failover("node1")
+               verifyClients(p, 0, 1, 1, 2)
+               p.OnDelete(node1)
+               verifyClients(p, 0, 0, 1, 2)
+       })
+})
+
+func verifyClients(p *pub, active, evict, onAdd, onDelete int) {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+       gomega.Expect(p.active).Should(gomega.HaveLen(active))
+       gomega.Expect(p.evictable).Should(gomega.HaveLen(evict))
+       h := p.handler.(*mockHandler)
+       gomega.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd))
+       gomega.Expect(h.deleteCount).Should(gomega.Equal(onDelete))
+}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index d2576e51..2acaf6ff 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -27,6 +27,8 @@ import (
 
        "github.com/pkg/errors"
        "go.uber.org/multierr"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
        "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/anypb"
 
@@ -47,12 +49,14 @@ var (
 
 type pub struct {
        schema.UnimplementedOnInitHandler
-       metadata metadata.Repo
-       handler  schema.EventHandler
-       log      *logger.Logger
-       clients  map[string]*client
-       closer   *run.Closer
-       mu       sync.RWMutex
+       metadata   metadata.Repo
+       handler    schema.EventHandler
+       log        *logger.Logger
+       registered map[string]struct{}
+       active     map[string]*client
+       evictable  map[string]evictNode
+       closer     *run.Closer
+       mu         sync.RWMutex
 }
 
 func (p *pub) Register(handler schema.EventHandler) {
@@ -60,13 +64,18 @@ func (p *pub) Register(handler schema.EventHandler) {
 }
 
 func (p *pub) GracefulStop() {
-       p.closer.Done()
-       p.closer.CloseThenWait()
        p.mu.Lock()
        defer p.mu.Unlock()
-       for _, c := range p.clients {
+       for i := range p.evictable {
+               close(p.evictable[i].c)
+       }
+       p.evictable = nil
+       p.closer.Done()
+       p.closer.CloseThenWait()
+       for _, c := range p.active {
                _ = c.conn.Close()
        }
+       p.active = nil
 }
 
 // Serve implements run.Service.
@@ -74,10 +83,10 @@ func (p *pub) Serve() run.StopNotify {
        return p.closer.CloseNotify()
 }
 
-func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, 
error) {
+func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages 
bus.Message) ([]bus.Future, error) {
        var names []string
        p.mu.RLock()
-       for k := range p.clients {
+       for k := range p.active {
                names = append(names, k)
        }
        p.mu.RUnlock()
@@ -85,10 +94,9 @@ func (p *pub) Broadcast(topic bus.Topic, messages 
bus.Message) ([]bus.Future, er
        var wg sync.WaitGroup
        for _, n := range names {
                wg.Add(1)
-               // Send a value into sem. If sem is full, this will block until 
there's room.
                go func(n string) {
                        defer wg.Done()
-                       f, err := p.Publish(topic, 
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
+                       f, err := p.publish(timeout, topic, 
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
                        futureCh <- publishResult{n: n, f: f, e: err}
                }(n)
        }
@@ -101,6 +109,14 @@ func (p *pub) Broadcast(topic bus.Topic, messages 
bus.Message) ([]bus.Future, er
        for f := range futureCh {
                if f.e != nil {
                        errs = multierr.Append(errs, errors.Wrapf(f.e, "failed 
to publish message to %s", f.n))
+                       if isFailoverError(f.e) {
+                               if p.closer.AddRunning() {
+                                       go func() {
+                                               defer p.closer.Done()
+                                               p.failover(f.n)
+                                       }()
+                               }
+                       }
                        continue
                }
                futures = append(futures, f.f)
@@ -118,22 +134,22 @@ type publishResult struct {
        n string
 }
 
-func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, 
error) {
+func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages 
...bus.Message) (bus.Future, error) {
        var err error
        f := &future{}
        handleMessage := func(m bus.Message, err error) error {
                r, errSend := messageToRequest(topic, m)
                if errSend != nil {
-                       return multierr.Append(err, fmt.Errorf("failed to 
marshal message %T: %w", m, errSend))
+                       return multierr.Append(err, fmt.Errorf("failed to 
marshal message[%d]: %w", m.ID(), errSend))
                }
                node := m.Node()
                p.mu.RLock()
-               client, ok := p.clients[node]
+               client, ok := p.active[node]
                p.mu.RUnlock()
                if !ok {
                        return multierr.Append(err, fmt.Errorf("failed to get 
client for node %s", node))
                }
-               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               ctx, cancel := context.WithTimeout(context.Background(), 
timeout)
                f.cancelFn = append(f.cancelFn, cancel)
                stream, errCreateStream := client.client.Send(ctx)
                if errCreateStream != nil {
@@ -153,17 +169,28 @@ func (p *pub) Publish(topic bus.Topic, messages 
...bus.Message) (bus.Future, err
        return f, err
 }
 
+func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, 
error) {
+       return p.publish(5*time.Second, topic, messages...)
+}
+
 // NewBatchPublisher returns a new batch publisher.
-func (p *pub) NewBatchPublisher() queue.BatchPublisher {
-       return &batchPublisher{pub: p, streams: make(map[string]writeStream)}
+func (p *pub) NewBatchPublisher(timeout time.Duration) queue.BatchPublisher {
+       return &batchPublisher{
+               pub:     p,
+               streams: make(map[string]writeStream),
+               timeout: timeout,
+               f:       batchFuture{errNodes: make(map[string]struct{}), l: 
p.log},
+       }
 }
 
 // New returns a new queue client.
 func New(metadata metadata.Repo) queue.Client {
        return &pub{
-               metadata: metadata,
-               clients:  make(map[string]*client),
-               closer:   run.NewCloser(1),
+               metadata:   metadata,
+               active:     make(map[string]*client),
+               evictable:  make(map[string]evictNode),
+               registered: make(map[string]struct{}),
+               closer:     run.NewCloser(1),
        }
 }
 
@@ -185,6 +212,8 @@ type writeStream struct {
 type batchPublisher struct {
        pub     *pub
        streams map[string]writeStream
+       f       batchFuture
+       timeout time.Duration
 }
 
 func (bp *batchPublisher) Close() (err error) {
@@ -194,14 +223,23 @@ func (bp *batchPublisher) Close() (err error) {
        for i := range bp.streams {
                <-bp.streams[i].ctxDoneCh
        }
+       if bp.pub.closer.AddRunning() {
+               go func(f *batchFuture) {
+                       defer bp.pub.closer.Done()
+                       for _, n := range f.get() {
+                               bp.pub.failover(n)
+                       }
+               }(&bp.f)
+       }
        return err
 }
 
 func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) 
(bus.Future, error) {
+       var err error
        for _, m := range messages {
-               r, err := messageToRequest(topic, m)
-               if err != nil {
-                       err = multierr.Append(err, fmt.Errorf("failed to 
marshal message %T: %w", m, err))
+               r, errM2R := messageToRequest(topic, m)
+               if errM2R != nil {
+                       err = multierr.Append(err, fmt.Errorf("failed to 
marshal message %T: %w", m, errM2R))
                        continue
                }
                node := m.Node()
@@ -231,17 +269,17 @@ func (bp *batchPublisher) Publish(topic bus.Topic, 
messages ...bus.Message) (bus
                }
 
                bp.pub.mu.RLock()
-               client, ok := bp.pub.clients[node]
+               client, ok := bp.pub.active[node]
                bp.pub.mu.RUnlock()
                if !ok {
                        err = multierr.Append(err, fmt.Errorf("failed to get 
client for node %s", node))
                        continue
                }
-               ctx, cancel := context.WithTimeout(context.Background(), 
15*time.Second)
+               ctx, cancel := context.WithTimeout(context.Background(), 
bp.timeout)
                // this assignment is for getting around the go vet lint
                deferFn := cancel
                stream, errCreateStream := client.client.Send(ctx)
-               if err != nil {
+               if errCreateStream != nil {
                        err = multierr.Append(err, fmt.Errorf("failed to get 
stream for node %s: %w", node, errCreateStream))
                        continue
                }
@@ -249,9 +287,13 @@ func (bp *batchPublisher) Publish(topic bus.Topic, 
messages ...bus.Message) (bus
                        client:    stream,
                        ctxDoneCh: ctx.Done(),
                }
+               bp.f.events = append(bp.f.events, make(chan batchEvent))
                _ = sendData()
-               go func(s clusterv1.Service_SendClient, deferFn func()) {
-                       defer deferFn()
+               go func(s clusterv1.Service_SendClient, deferFn func(), bc chan 
batchEvent) {
+                       defer func() {
+                               close(bc)
+                               deferFn()
+                       }()
                        for {
                                _, errRecv := s.Recv()
                                if errRecv == nil {
@@ -260,11 +302,12 @@ func (bp *batchPublisher) Publish(topic bus.Topic, 
messages ...bus.Message) (bus
                                if errors.Is(errRecv, io.EOF) {
                                        return
                                }
-                               bp.pub.log.Err(errRecv).Msg("failed to receive 
message")
+                               bc <- batchEvent{n: node, e: errRecv}
+                               return
                        }
-               }(stream, deferFn)
+               }(stream, deferFn, bp.f.events[len(bp.f.events)-1])
        }
-       return nil, nil
+       return nil, err
 }
 
 func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, 
error) {
@@ -275,7 +318,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) 
(*clusterv1.SendRequest, e
        }
        message, ok := m.Data().(proto.Message)
        if !ok {
-               return nil, fmt.Errorf("invalid message type %T", m)
+               return nil, fmt.Errorf("invalid message type %T", m.Data())
        }
        anyMessage, err := anypb.New(message)
        if err != nil {
@@ -342,3 +385,54 @@ func (l *future) GetAll() ([]bus.Message, error) {
                ret = append(ret, m)
        }
 }
+
+type batchEvent struct {
+       e error
+       n string
+}
+
+type batchFuture struct {
+       errNodes map[string]struct{}
+       l        *logger.Logger
+       events   []chan batchEvent
+}
+
+func (b *batchFuture) get() []string {
+       var wg sync.WaitGroup
+       var mux sync.Mutex
+       wg.Add(len(b.events))
+       for _, e := range b.events {
+               go func(e chan batchEvent, mux *sync.Mutex) {
+                       defer wg.Done()
+                       for evt := range e {
+                               func() {
+                                       mux.Lock()
+                                       defer mux.Unlock()
+                                       // only log the error once for each node
+                                       if _, ok := b.errNodes[evt.n]; !ok && 
isFailoverError(evt.e) {
+                                               
b.l.Error().Err(evt.e).Msgf("failed to send message to node %s", evt.n)
+                                               b.errNodes[evt.n] = struct{}{}
+                                               return
+                                       }
+                                       b.l.Error().Err(evt.e).Msgf("failed to 
send message to node %s", evt.n)
+                               }()
+                       }
+               }(e, &mux)
+       }
+       wg.Wait()
+       mux.Lock()
+       defer mux.Unlock()
+       var result []string
+       for n := range b.errNodes {
+               result = append(result, n)
+       }
+       return result
+}
+
+func isFailoverError(err error) bool {
+       s, ok := status.FromError(err)
+       if !ok {
+               return false
+       }
+       return s.Code() == codes.Unavailable || s.Code() == 
codes.DeadlineExceeded
+}
diff --git a/banyand/queue/pub/pub_suite_test.go 
b/banyand/queue/pub/pub_suite_test.go
new file mode 100644
index 00000000..79080d9a
--- /dev/null
+++ b/banyand/queue/pub/pub_suite_test.go
@@ -0,0 +1,183 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "io"
+       "net"
+       "strconv"
+       "testing"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "github.com/pkg/errors"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/anypb"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+func TestPub(t *testing.T) {
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "Publish Suite")
+}
+
+var _ = ginkgo.BeforeSuite(func() {
+       gomega.Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(gomega.Succeed())
+})
+
+type mockServer struct {
+       clusterv1.UnimplementedServiceServer
+       healthServer *health.Server
+       latency      time.Duration
+       code         codes.Code
+}
+
+func (s *mockServer) Send(stream clusterv1.Service_SendServer) error {
+       for {
+               req, err := stream.Recv()
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               return nil
+                       }
+                       return err
+               }
+
+               if s.code != codes.OK {
+                       s.healthServer.SetServingStatus("", 
grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+                       return status.Error(s.code, "mock error")
+               }
+
+               if s.latency > 0 {
+                       time.Sleep(s.latency)
+               }
+
+               topic, ok := data.TopicMap[req.Topic]
+
+               if !ok {
+                       panic("invalid topic")
+               }
+               f := data.TopicResponseMap[topic]
+               var body *anypb.Any
+               if f == nil {
+                       body = req.Body
+               } else {
+                       var errAny error
+                       body, errAny = anypb.New(f())
+                       if errAny != nil {
+                               panic(errAny)
+                       }
+               }
+
+               res := &clusterv1.SendResponse{
+                       MessageId: req.MessageId,
+                       Error:     "",
+                       Body:      body,
+               }
+
+               if err := stream.Send(res); err != nil {
+                       return err
+               }
+       }
+}
+
+func setup(address string, code codes.Code, latency time.Duration) func() {
+       s := grpc.NewServer()
+       hs := health.NewServer()
+       clusterv1.RegisterServiceServer(s, &mockServer{
+               code:         code,
+               latency:      latency,
+               healthServer: hs,
+       })
+       grpc_health_v1.RegisterHealthServer(s, hs)
+       go func() {
+               lis, err := net.Listen("tcp", address)
+               if err != nil {
+                       logger.Panicf("failed to listen: %v", err)
+                       return
+               }
+               if err := s.Serve(lis); err != nil {
+                       logger.Panicf("Server exited with error: %v", err)
+               }
+       }()
+       return s.GracefulStop
+}
+
+func getAddress() string {
+       ports, err := test.AllocateFreePorts(1)
+       if err != nil {
+               logger.Panicf("failed to allocate free ports: %v", err)
+               return ""
+       }
+       return net.JoinHostPort("localhost", strconv.Itoa(ports[0]))
+}
+
+type mockHandler struct {
+       addOrUpdateCount int
+       deleteCount      int
+}
+
+func (m *mockHandler) OnInit(_ []schema.Kind) (bool, []int64) {
+       panic("no implemented")
+}
+
+func (m *mockHandler) OnAddOrUpdate(_ schema.Metadata) {
+       m.addOrUpdateCount++
+}
+
+func (m *mockHandler) OnDelete(_ schema.Metadata) {
+       m.deleteCount++
+}
+
+func newPub() *pub {
+       p := New(nil).(*pub)
+       p.log = logger.GetLogger("pub")
+       p.handler = &mockHandler{}
+       return p
+}
+
+func getDataNode(name string, address string) schema.Metadata {
+       return schema.Metadata{
+               TypeMeta: schema.TypeMeta{
+                       Name: name,
+                       Kind: schema.KindNode,
+               },
+               Spec: &databasev1.Node{
+                       Metadata: &commonv1.Metadata{
+                               Name: name,
+                       },
+                       Roles:       
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+                       GrpcAddress: address,
+               },
+       }
+}
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
new file mode 100644
index 00000000..0f1bb250
--- /dev/null
+++ b/banyand/queue/pub/pub_test.go
@@ -0,0 +1,239 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "io"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "github.com/pkg/errors"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+var _ = ginkgo.Describe("Publish and Broadcast", func() {
+       var goods []gleak.Goroutine
+       ginkgo.BeforeEach(func() {
+               goods = gleak.Goroutines()
+       })
+       ginkgo.AfterEach(func() {
+               gomega.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       })
+
+       ginkgo.Context("Publisher and batch publisher", func() {
+               ginkgo.It("should publish messages", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       bp := p.NewBatchPublisher(3 * time.Second)
+                       defer bp.Close()
+                       for i := 0; i < 10; i++ {
+                               _, err := bp.Publish(data.TopicStreamWrite,
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", 
&streamv1.InternalWriteRequest{}),
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", 
&streamv1.InternalWriteRequest{}),
+                               )
+                               
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       }
+               })
+
+               ginkgo.It("should go to evict queue when node is unavailable", 
func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.Unavailable, 0)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       bp := p.NewBatchPublisher(3 * time.Second)
+                       for i := 0; i < 10; i++ {
+                               _, err := bp.Publish(data.TopicStreamWrite,
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", 
&streamv1.InternalWriteRequest{}),
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", 
&streamv1.InternalWriteRequest{}),
+                               )
+                               if err != nil {
+                                       // The mock server will return io.EOF 
when the node is unavailable
+                                       // It will close the stream and return 
io.EOF to the send
+                                       gomega.Expect(errors.Is(err, io.EOF))
+                               }
+                       }
+                       
gomega.Expect(bp.Close()).ShouldNot(gomega.HaveOccurred())
+                       gomega.Eventually(func() int {
+                               p.mu.RLock()
+                               defer p.mu.RUnlock()
+                               return len(p.active)
+                       }, flags.EventuallyTimeout).Should(gomega.Equal(1))
+                       func() {
+                               p.mu.RLock()
+                               defer p.mu.RUnlock()
+                               
gomega.Expect(p.evictable).Should(gomega.HaveLen(1))
+                               
gomega.Expect(p.evictable).Should(gomega.HaveKey("node2"))
+                               
gomega.Expect(p.active).Should(gomega.HaveKey("node1"))
+                       }()
+               })
+
+               ginkgo.It("should stay in active queue when operation takes a 
long time", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 0)
+                       closeFn2 := setup(addr2, codes.OK, 5*time.Second)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       bp := p.NewBatchPublisher(3 * time.Second)
+                       for i := 0; i < 10; i++ {
+                               _, err := bp.Publish(data.TopicStreamWrite,
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", 
&streamv1.InternalWriteRequest{}),
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", 
&streamv1.InternalWriteRequest{}),
+                               )
+                               
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       }
+                       
gomega.Expect(bp.Close()).ShouldNot(gomega.HaveOccurred())
+                       gomega.Consistently(func() int {
+                               p.mu.RLock()
+                               defer p.mu.RUnlock()
+                               return len(p.active)
+                       }, "1s").Should(gomega.Equal(2))
+               })
+       })
+
+       ginkgo.Context("Broadcast", func() {
+               ginkgo.It("should broadcast messages", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
+                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       gomega.Expect(ff).Should(gomega.HaveLen(2))
+                       messages, err := ff[0].GetAll()
+                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       gomega.Expect(messages).Should(gomega.HaveLen(1))
+               })
+
+               ginkgo.It("should broadcast messages to failed nodes", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.Unavailable, 0)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
+                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       gomega.Expect(ff).Should(gomega.HaveLen(2))
+                       for i := range ff {
+                               _, err := ff[i].GetAll()
+                               ginkgo.GinkgoWriter.Printf("error: %v \n", err)
+                               if err != nil {
+                                       s, ok := status.FromError(err)
+                                       
gomega.Expect(ok).Should(gomega.BeTrue())
+                                       
gomega.Expect(s.Code()).Should(gomega.Equal(codes.Unavailable))
+                                       return
+                               }
+                       }
+                       ginkgo.Fail("should not reach here")
+               })
+
+               ginkgo.It("should broadcast messages to slow nodes", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.OK, 5*time.Second)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
+                       gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       gomega.Expect(ff).Should(gomega.HaveLen(2))
+                       for i := range ff {
+                               _, err := ff[i].GetAll()
+                               ginkgo.GinkgoWriter.Printf("error: %v \n", err)
+                               if err != nil {
+                                       s, ok := status.FromError(err)
+                                       
gomega.Expect(ok).Should(gomega.BeTrue())
+                                       
gomega.Expect(s.Code()).Should(gomega.Equal(codes.DeadlineExceeded))
+                                       return
+                               }
+                       }
+                       ginkgo.Fail("should not reach here")
+               })
+       })
+})
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 1f4be29c..aad9141a 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -19,6 +19,7 @@ package queue
 
 import (
        "io"
+       "time"
 
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -41,7 +42,7 @@ type Client interface {
        run.Unit
        bus.Publisher
        bus.Broadcaster
-       NewBatchPublisher() BatchPublisher
+       NewBatchPublisher(timeout time.Duration) BatchPublisher
        Register(schema.EventHandler)
 }
 
diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md
index 624bc828..3f49cda1 100644
--- a/docs/concept/clustering.md
+++ b/docs/concept/clustering.md
@@ -45,6 +45,8 @@ All nodes within a BanyanDB cluster communicate with other 
nodes according to th
 
 All nodes in the cluster are discovered by the Meta Nodes. When a node starts 
up, it registers itself with the Meta Nodes. The Meta Nodes then share this 
information with the Liaison Nodes which use it to route requests to the 
appropriate nodes.
 
+If data nodes are unable to connect to the meta nodes due to network partition 
or other issues, they will be removed from the meta nodes. However, the liaison 
nodes will not remove the data nodes from their routing list until the data 
nodes are also unreachable from the liaison nodes' perspective. This approach 
ensures that the system can continue to function even if some data nodes are 
temporarily unavailable from the meta nodes.
+
 ## 3. **Data Organization**
 
 Different nodes in BanyanDB are responsible for different parts of the 
database, while Query and Liaison Nodes manage the routing and processing of 
queries.
@@ -177,3 +179,15 @@ User
 4. The results from each shard are then returned to the Liaison Node, which 
consolidates them into a single response to the user.
 
 This architecture allows BanyanDB to execute queries efficiently across a 
distributed system, leveraging the distributed query capabilities of the 
Liaison Node and the parallel processing of Data Nodes.
+
+## 7. Failover
+
+BanyanDB is designed to be highly available and fault-tolerant.
+
+In case of a Data Node failure, the system can automatically recover and 
continue to operate.
+
+Liaison nodes have a built-in mechanism to detect the failure of a Data Node. 
When a Data Node fails, the Liaison Node will automatically route requests to 
other available Data Nodes with the same shard. This ensures that the system 
remains operational even in the face of node failures. Thanks to the query 
mode, which allows Liaison Nodes to access all Data Nodes, the system can 
continue to function even if some Data Nodes are unavailable. When the failed 
data nodes are restored, the sys [...]
+
+In the case of a Liaison Node failure, the system can be configured to have 
multiple Liaison Nodes for redundancy. If one Liaison Node fails, the other 
Liaison Nodes can take over its responsibilities, ensuring that the system 
remains available.
+
+> Please note that any written request which triggers the failover process 
will be rejected, and the client should re-send the request.
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index 26c5be17..425bc60f 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -23,6 +23,7 @@ import (
        "errors"
        "io"
        "sync"
+       "time"
 
        "go.uber.org/multierr"
 
@@ -102,7 +103,7 @@ type Publisher interface {
 
 // Broadcaster allow sending Messages to a Topic and receiving the responses.
 type Broadcaster interface {
-       Broadcast(topic Topic, message Message) ([]Future, error)
+       Broadcast(timeout time.Duration, topic Topic, message Message) 
([]Future, error)
 }
 
 type channel chan event
diff --git a/pkg/grpchelper/client.go b/pkg/grpchelper/client.go
index 4e578774..3f815022 100644
--- a/pkg/grpchelper/client.go
+++ b/pkg/grpchelper/client.go
@@ -70,11 +70,11 @@ func Request(ctx context.Context, rpcTimeout time.Duration, 
fn func(rpcCtx conte
        err := fn(rpcCtx)
        if err != nil {
                if stat, ok := status.FromError(err); ok && stat.Code() == 
codes.Unimplemented {
-                       l.Warn().Str("stat", stat.Message()).Msg("error: this 
server does not implement the service")
+                       l.Info().Str("stat", stat.Message()).Msg("error: this 
server does not implement the service")
                } else if stat, ok := status.FromError(err); ok && stat.Code() 
== codes.DeadlineExceeded {
-                       l.Warn().Dur("rpcTimeout", rpcTimeout).Msg("timeout: 
rpc did not complete within")
+                       l.Info().Dur("rpcTimeout", rpcTimeout).Msg("timeout: 
rpc did not complete within")
                } else {
-                       l.Error().Err(err).Msg("error: rpc failed:")
+                       l.Info().Err(err).Msg("error: rpc failed:")
                }
                return err
        }
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index 8f35f4b4..d0aee004 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -20,6 +20,7 @@ package measure
 import (
        "context"
        "fmt"
+       "time"
 
        "go.uber.org/multierr"
        "google.golang.org/protobuf/proto"
@@ -35,6 +36,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
+const defaultQueryTimeout = 10 * time.Second
+
 var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
 
 type unresolvedDistributed struct {
@@ -138,11 +141,11 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(executor.MIterator, erro
        if t.maxDataPointsSize > 0 {
                query.Limit = t.maxDataPointsSize
        }
-       ff, err := dctx.Broadcast(data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       var allErr error
+       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
        if err != nil {
                return nil, err
        }
-       var allErr error
        var see []sort.Iterator[*comparableDataPoint]
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index 6f524737..8c3ed930 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -20,6 +20,7 @@ package stream
 import (
        "context"
        "fmt"
+       "time"
 
        "go.uber.org/multierr"
        "google.golang.org/protobuf/proto"
@@ -35,6 +36,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
+const defaultQueryTimeout = 10 * time.Second
+
 var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil)
 
 type unresolvedDistributed struct {
@@ -128,7 +131,7 @@ func (t *distributedPlan) Execute(ctx context.Context) 
([]*streamv1.Element, err
        if t.maxElementSize > 0 {
                query.Limit = t.maxElementSize
        }
-       ff, err := dctx.Broadcast(data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
        if err != nil {
                return nil, err
        }
@@ -152,7 +155,7 @@ func (t *distributedPlan) Execute(ctx context.Context) 
([]*streamv1.Element, err
        for iter.Next() {
                result = append(result, iter.Val().Element)
        }
-       return result, nil
+       return result, allErr
 }
 
 func (t *distributedPlan) String() string {
diff --git a/test/stress/istio/istio_suite_test.go 
b/test/stress/istio/istio_suite_test.go
index 5377b72e..a1945da0 100644
--- a/test/stress/istio/istio_suite_test.go
+++ b/test/stress/istio/istio_suite_test.go
@@ -66,10 +66,6 @@ var _ = g.Describe("Istio", func() {
                path, deferFn, err := test.NewSpace()
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                measurePath := filepath.Join(path, "measure")
-               g.DeferCleanup(func() {
-                       printDiskUsage(measurePath, 5, 0)
-                       deferFn()
-               })
                var ports []int
                ports, err = test.AllocateFreePorts(4)
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
@@ -80,6 +76,8 @@ var _ = g.Describe("Istio", func() {
                g.DeferCleanup(func() {
                        time.Sleep(time.Minute)
                        closerServerFunc()
+                       printDiskUsage(measurePath, 5, 0)
+                       deferFn()
                })
                gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second, 
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),
                        flags.EventuallyTimeout).Should(gomega.Succeed())

Reply via email to