This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch publish
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 257e06586302fc7022077dfac897657d759a940b
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Apr 15 03:26:09 2024 +0000

    Add test cases
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/queue/pub/client.go           | 140 ++++++++++++++++++++++++++++--
 banyand/queue/pub/client_test.go      |  90 ++++++++++++++++++++
 banyand/queue/pub/pub.go              | 112 +++++++++++++++++++-----
 banyand/queue/pub/pub_suite_test.go   | 156 ++++++++++++++++++++++++++++++++++
 banyand/queue/pub/pub_test.go         | 111 ++++++++++++++++++++++++
 pkg/grpchelper/client.go              |   6 +-
 test/stress/istio/istio_suite_test.go |   6 +-
 7 files changed, 586 insertions(+), 35 deletions(-)

diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 72014607..7bdc77da 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -18,17 +18,29 @@
 package pub
 
 import (
+       "context"
        "fmt"
+       "time"
 
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/grpc/health/grpc_health_v1"
 
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+const rpcTimeout = 2 * time.Second
+
 var (
+       // Retry policy for health check.
+       initBackoff       = time.Second
+       maxBackoff        = 20 * time.Second
+       backoffMultiplier = 2.0
+
        serviceName = clusterv1.Service_ServiceDesc.ServiceName
 
        // The timeout is set by each RPC.
@@ -49,6 +61,7 @@ var (
 type client struct {
        client clusterv1.ServiceClient
        conn   *grpc.ClientConn
+       md     schema.Metadata
 }
 
 func (p *pub) OnAddOrUpdate(md schema.Metadata) {
@@ -88,16 +101,25 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
        if _, ok := p.clients[name]; ok {
                return
        }
+       if _, ok := p.evictClients[name]; ok {
+               return
+       }
        conn, err := grpc.Dial(address, 
grpc.WithTransportCredentials(insecure.NewCredentials()), 
grpc.WithDefaultServiceConfig(retryPolicy))
        if err != nil {
                p.log.Error().Err(err).Msg("failed to connect to grpc server")
                return
        }
+
+       if !p.checkClient(conn, md) {
+               return
+       }
+
        c := clusterv1.NewServiceClient(conn)
-       p.clients[name] = &client{conn: conn, client: c}
+       p.clients[name] = &client{conn: conn, client: c, md: md}
        if p.handler != nil {
                p.handler.OnAddOrUpdate(md)
        }
+       p.log.Info().Stringer("node", node).Msg("new node is healthy, add it to 
active queue")
 }
 
 func (p *pub) OnDelete(md schema.Metadata) {
@@ -116,14 +138,122 @@ func (p *pub) OnDelete(md schema.Metadata) {
        }
        p.mu.Lock()
        defer p.mu.Unlock()
+       if en, ok := p.evictClients[name]; ok {
+               close(en.c)
+               delete(p.evictClients, name)
+               p.log.Info().Stringer("node", node).Msg("node is removed from 
evict queue by delete event")
+               return
+       }
 
-       if client, ok := p.clients[name]; ok {
-               if client.conn != nil {
-                       client.conn.Close() // Close the client connection
-               }
+       if client, ok := p.clients[name]; ok && !p.healthCheck(node, 
client.conn) {
+               _ = client.conn.Close()
                delete(p.clients, name)
                if p.handler != nil {
                        p.handler.OnDelete(md)
                }
+               p.log.Info().Stringer("node", node).Msg("node is removed from 
active queue by delete event")
+       }
+}
+
+func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool {
+       node, ok := md.Spec.(*databasev1.Node)
+       if !ok {
+               logger.Panicf("failed to cast node spec")
+               return false
+       }
+       if p.healthCheck(node, conn) {
+               return true
+       }
+       _ = conn.Close()
+       if !p.closer.AddRunning() {
+               return false
+       }
+       p.log.Info().Stringer("node", node).Msg("node is unhealthy, move it to 
evict queue")
+       name := node.Metadata.Name
+       p.evictClients[name] = evictNode{n: node, c: make(chan struct{})}
+       if p.handler != nil {
+               p.handler.OnDelete(md)
+       }
+       go func(p *pub, name string, en evictNode, md schema.Metadata) {
+               defer p.closer.Done()
+               backoff := initBackoff
+               for {
+                       select {
+                       case <-time.After(backoff):
+                               connEvict, errEvict := 
grpc.Dial(node.GrpcAddress, 
grpc.WithTransportCredentials(insecure.NewCredentials()), 
grpc.WithDefaultServiceConfig(retryPolicy))
+                               if errEvict == nil && p.healthCheck(en.n, 
connEvict) {
+                                       p.mu.Lock()
+                                       defer p.mu.Unlock()
+                                       if _, ok := p.evictClients[name]; !ok {
+                                               // The client has been removed 
from evict clients map, just return
+                                               return
+                                       }
+                                       c := 
clusterv1.NewServiceClient(connEvict)
+                                       p.clients[name] = &client{conn: 
connEvict, client: c, md: md}
+                                       if p.handler != nil {
+                                               p.handler.OnAddOrUpdate(md)
+                                       }
+                                       delete(p.evictClients, name)
+                                       p.log.Info().Stringer("node", 
en.n).Msg("node is healthy, move it back to active queue")
+                                       return
+                               }
+                               if errEvict != nil {
+                                       _ = connEvict.Close()
+                               }
+                               p.log.Error().Err(errEvict).Msgf("failed to 
re-connect to grpc server after waiting for %s", backoff)
+                       case <-en.c:
+                               return
+                       }
+                       if backoff < maxBackoff {
+                               backoff *= time.Duration(backoffMultiplier)
+                       } else {
+                               backoff = maxBackoff
+                       }
+               }
+       }(p, name, p.evictClients[name], md)
+       return false
+}
+
+func (p *pub) healthCheck(node fmt.Stringer, conn *grpc.ClientConn) bool {
+       var resp *grpc_health_v1.HealthCheckResponse
+       if err := grpchelper.Request(context.Background(), rpcTimeout, 
func(rpcCtx context.Context) (err error) {
+               resp, err = grpc_health_v1.NewHealthClient(conn).Check(rpcCtx,
+                       &grpc_health_v1.HealthCheckRequest{
+                               Service: "",
+                       })
+               return err
+       }); err != nil {
+               if e := p.log.Debug(); e.Enabled() {
+                       e.Err(err).Stringer("node", node).Msg("service 
unhealthy")
+               }
+               return false
+       }
+       if resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING {
+               return true
        }
+       return false
+}
+
+func (p *pub) failover(node string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       if en, ok := p.evictClients[node]; ok {
+               close(en.c)
+               delete(p.evictClients, node)
+               p.log.Info().Str("node", node).Msg("node is removed from evict 
queue by wire event")
+               return
+       }
+
+       if client, ok := p.clients[node]; ok && !p.checkClient(client.conn, 
client.md) {
+               _ = client.conn.Close()
+               delete(p.clients, node)
+               if p.handler != nil {
+                       p.handler.OnDelete(client.md)
+               }
+       }
+}
+
+type evictNode struct {
+       n *databasev1.Node
+       c chan struct{}
 }
diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go
new file mode 100644
index 00000000..83200aa9
--- /dev/null
+++ b/banyand/queue/pub/client_test.go
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc/codes"
+
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+var _ = ginkgo.Describe("publish clients register/unregister", func() {
+       var goods []gleak.Goroutine
+       ginkgo.BeforeEach(func() {
+               goods = gleak.Goroutines()
+       })
+       ginkgo.AfterEach(func() {
+               gomega.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       })
+       ginkgo.It("should register and unregister clients", func() {
+               addr1 := getAddress()
+               closeFn := setup(addr1, codes.OK, 200*time.Millisecond)
+               p := newPub()
+               defer func() {
+                       p.GracefulStop()
+                       closeFn()
+               }()
+               node1 := getDataNode("node1", addr1)
+               p.OnAddOrUpdate(node1)
+               verifyClients(p, 1, 0, 1, 0)
+               addr2 := getAddress()
+               node2 := getDataNode("node2", addr2)
+               p.OnAddOrUpdate(node2)
+               verifyClients(p, 1, 1, 1, 1)
+
+               p.OnDelete(node1)
+               verifyClients(p, 1, 1, 1, 1)
+               p.OnDelete(node2)
+               verifyClients(p, 1, 0, 1, 1)
+               closeFn()
+               p.OnDelete(node1)
+               verifyClients(p, 0, 0, 1, 2)
+       })
+
+       ginkgo.It("should move back to active queue", func() {
+               addr1 := getAddress()
+               node1 := getDataNode("node1", addr1)
+               p := newPub()
+               defer p.GracefulStop()
+               p.OnAddOrUpdate(node1)
+               verifyClients(p, 0, 1, 0, 1)
+               closeFn := setup(addr1, codes.OK, 200*time.Millisecond)
+               defer closeFn()
+               gomega.Eventually(func() int {
+                       p.mu.RLock()
+                       defer p.mu.RUnlock()
+                       return len(p.clients)
+               }, flags.EventuallyTimeout).Should(gomega.Equal(1))
+               verifyClients(p, 1, 0, 1, 1)
+       })
+})
+
+func verifyClients(p *pub, active, evict, onAdd, onDelete int) {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+       gomega.Expect(p.clients).Should(gomega.HaveLen(active))
+       gomega.Expect(p.evictClients).Should(gomega.HaveLen(evict))
+       h := p.handler.(*mockHandler)
+       gomega.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd))
+       gomega.Expect(h.deleteCount).Should(gomega.Equal(onDelete))
+}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index d2576e51..cbe1cd50 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -47,12 +47,13 @@ var (
 
 type pub struct {
        schema.UnimplementedOnInitHandler
-       metadata metadata.Repo
-       handler  schema.EventHandler
-       log      *logger.Logger
-       clients  map[string]*client
-       closer   *run.Closer
-       mu       sync.RWMutex
+       metadata     metadata.Repo
+       handler      schema.EventHandler
+       log          *logger.Logger
+       clients      map[string]*client
+       evictClients map[string]evictNode
+       closer       *run.Closer
+       mu           sync.RWMutex
 }
 
 func (p *pub) Register(handler schema.EventHandler) {
@@ -60,13 +61,18 @@ func (p *pub) Register(handler schema.EventHandler) {
 }
 
 func (p *pub) GracefulStop() {
-       p.closer.Done()
-       p.closer.CloseThenWait()
        p.mu.Lock()
        defer p.mu.Unlock()
+       for i := range p.evictClients {
+               close(p.evictClients[i].c)
+       }
+       p.evictClients = nil
+       p.closer.Done()
+       p.closer.CloseThenWait()
        for _, c := range p.clients {
                _ = c.conn.Close()
        }
+       p.clients = nil
 }
 
 // Serve implements run.Service.
@@ -85,7 +91,6 @@ func (p *pub) Broadcast(topic bus.Topic, messages 
bus.Message) ([]bus.Future, er
        var wg sync.WaitGroup
        for _, n := range names {
                wg.Add(1)
-               // Send a value into sem. If sem is full, this will block until 
there's room.
                go func(n string) {
                        defer wg.Done()
                        f, err := p.Publish(topic, 
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
@@ -124,7 +129,7 @@ func (p *pub) Publish(topic bus.Topic, messages 
...bus.Message) (bus.Future, err
        handleMessage := func(m bus.Message, err error) error {
                r, errSend := messageToRequest(topic, m)
                if errSend != nil {
-                       return multierr.Append(err, fmt.Errorf("failed to 
marshal message %T: %w", m, errSend))
+                       return multierr.Append(err, fmt.Errorf("failed to 
marshal message[%d]: %w", m.ID(), errSend))
                }
                node := m.Node()
                p.mu.RLock()
@@ -155,15 +160,20 @@ func (p *pub) Publish(topic bus.Topic, messages 
...bus.Message) (bus.Future, err
 
 // NewBatchPublisher returns a new batch publisher.
 func (p *pub) NewBatchPublisher() queue.BatchPublisher {
-       return &batchPublisher{pub: p, streams: make(map[string]writeStream)}
+       return &batchPublisher{
+               pub:     p,
+               streams: make(map[string]writeStream),
+               f:       batchFuture{errNodes: make(map[string]struct{}), l: 
p.log},
+       }
 }
 
 // New returns a new queue client.
 func New(metadata metadata.Repo) queue.Client {
        return &pub{
-               metadata: metadata,
-               clients:  make(map[string]*client),
-               closer:   run.NewCloser(1),
+               metadata:     metadata,
+               clients:      make(map[string]*client),
+               evictClients: make(map[string]evictNode),
+               closer:       run.NewCloser(1),
        }
 }
 
@@ -185,6 +195,7 @@ type writeStream struct {
 type batchPublisher struct {
        pub     *pub
        streams map[string]writeStream
+       f       batchFuture
 }
 
 func (bp *batchPublisher) Close() (err error) {
@@ -194,14 +205,23 @@ func (bp *batchPublisher) Close() (err error) {
        for i := range bp.streams {
                <-bp.streams[i].ctxDoneCh
        }
+       if bp.pub.closer.AddRunning() {
+               go func(f *batchFuture) {
+                       defer bp.pub.closer.Done()
+                       for _, n := range f.get() {
+                               bp.pub.failover(n)
+                       }
+               }(&bp.f)
+       }
        return err
 }
 
 func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) 
(bus.Future, error) {
+       var err error
        for _, m := range messages {
-               r, err := messageToRequest(topic, m)
-               if err != nil {
-                       err = multierr.Append(err, fmt.Errorf("failed to 
marshal message %T: %w", m, err))
+               r, errM2R := messageToRequest(topic, m)
+               if errM2R != nil {
+                       err = multierr.Append(err, fmt.Errorf("failed to 
marshal message %T: %w", m, errM2R))
                        continue
                }
                node := m.Node()
@@ -249,9 +269,13 @@ func (bp *batchPublisher) Publish(topic bus.Topic, 
messages ...bus.Message) (bus
                        client:    stream,
                        ctxDoneCh: ctx.Done(),
                }
+               bp.f.events = append(bp.f.events, make(chan batchEvent))
                _ = sendData()
-               go func(s clusterv1.Service_SendClient, deferFn func()) {
-                       defer deferFn()
+               go func(s clusterv1.Service_SendClient, deferFn func(), bc chan 
batchEvent) {
+                       defer func() {
+                               close(bc)
+                               deferFn()
+                       }()
                        for {
                                _, errRecv := s.Recv()
                                if errRecv == nil {
@@ -260,11 +284,12 @@ func (bp *batchPublisher) Publish(topic bus.Topic, 
messages ...bus.Message) (bus
                                if errors.Is(errRecv, io.EOF) {
                                        return
                                }
-                               bp.pub.log.Err(errRecv).Msg("failed to receive 
message")
+                               bc <- batchEvent{n: node, e: errRecv}
+                               return
                        }
-               }(stream, deferFn)
+               }(stream, deferFn, bp.f.events[len(bp.f.events)-1])
        }
-       return nil, nil
+       return nil, err
 }
 
 func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, 
error) {
@@ -275,7 +300,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) 
(*clusterv1.SendRequest, e
        }
        message, ok := m.Data().(proto.Message)
        if !ok {
-               return nil, fmt.Errorf("invalid message type %T", m)
+               return nil, fmt.Errorf("invalid message type %T", m.Data())
        }
        anyMessage, err := anypb.New(message)
        if err != nil {
@@ -342,3 +367,44 @@ func (l *future) GetAll() ([]bus.Message, error) {
                ret = append(ret, m)
        }
 }
+
+type batchEvent struct {
+       e error
+       n string
+}
+
+type batchFuture struct {
+       errNodes map[string]struct{}
+       l        *logger.Logger
+       events   []chan batchEvent
+}
+
+func (b *batchFuture) get() []string {
+       var wg sync.WaitGroup
+       var mux sync.Mutex
+       wg.Add(len(b.events))
+       for _, e := range b.events {
+               go func(e chan batchEvent, mux *sync.Mutex) {
+                       defer wg.Done()
+                       for evt := range e {
+                               func() {
+                                       mux.Lock()
+                                       defer mux.Unlock()
+                                       // only log the error once for each node
+                                       if _, ok := b.errNodes[evt.n]; !ok {
+                                               
b.l.Error().Err(evt.e).Msgf("failed to send message to node %s", evt.n)
+                                       }
+                                       b.errNodes[evt.n] = struct{}{}
+                               }()
+                       }
+               }(e, &mux)
+       }
+       wg.Wait()
+       mux.Lock()
+       defer mux.Unlock()
+       var result []string
+       for n := range b.errNodes {
+               result = append(result, n)
+       }
+       return result
+}
diff --git a/banyand/queue/pub/pub_suite_test.go 
b/banyand/queue/pub/pub_suite_test.go
new file mode 100644
index 00000000..96bee0ac
--- /dev/null
+++ b/banyand/queue/pub/pub_suite_test.go
@@ -0,0 +1,156 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "io"
+       "net"
+       "strconv"
+       "testing"
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "github.com/pkg/errors"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/health"
+       "google.golang.org/grpc/health/grpc_health_v1"
+       "google.golang.org/grpc/status"
+
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestPub(t *testing.T) {
+       gomega.RegisterFailHandler(ginkgo.Fail)
+       ginkgo.RunSpecs(t, "Publish Suite")
+}
+
+type mockServer struct {
+       clusterv1.UnimplementedServiceServer
+       healthServer *health.Server
+       latency      time.Duration
+       code         codes.Code
+}
+
+func (s *mockServer) Send(stream clusterv1.Service_SendServer) error {
+       for {
+               req, err := stream.Recv()
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               return nil
+                       }
+                       return err
+               }
+
+               if s.code != codes.OK {
+                       s.healthServer.SetServingStatus("", 
grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+                       return status.Error(s.code, "mock error")
+               }
+
+               if s.latency > 0 {
+                       time.Sleep(s.latency)
+               }
+
+               res := &clusterv1.SendResponse{
+                       MessageId: req.MessageId,
+                       Error:     "",
+                       Body:      req.Body,
+               }
+
+               if err := stream.Send(res); err != nil {
+                       panic(err)
+               }
+       }
+}
+
+func setup(address string, code codes.Code, latency time.Duration) func() {
+       s := grpc.NewServer()
+       hs := health.NewServer()
+       clusterv1.RegisterServiceServer(s, &mockServer{
+               code:         code,
+               latency:      latency,
+               healthServer: hs,
+       })
+       grpc_health_v1.RegisterHealthServer(s, hs)
+       go func() {
+               lis, err := net.Listen("tcp", address)
+               if err != nil {
+                       logger.Panicf("failed to listen: %v", err)
+                       return
+               }
+               if err := s.Serve(lis); err != nil {
+                       logger.Panicf("Server exited with error: %v", err)
+               }
+       }()
+       return s.GracefulStop
+}
+
+func getAddress() string {
+       ports, err := test.AllocateFreePorts(1)
+       if err != nil {
+               logger.Panicf("failed to allocate free ports: %v", err)
+               return ""
+       }
+       return net.JoinHostPort("localhost", strconv.Itoa(ports[0]))
+}
+
+type mockHandler struct {
+       addOrUpdateCount int
+       deleteCount      int
+}
+
+func (m *mockHandler) OnInit(_ []schema.Kind) (bool, []int64) {
+       panic("no implemented")
+}
+
+func (m *mockHandler) OnAddOrUpdate(_ schema.Metadata) {
+       m.addOrUpdateCount++
+}
+
+func (m *mockHandler) OnDelete(_ schema.Metadata) {
+       m.deleteCount++
+}
+
+func newPub() *pub {
+       p := New(nil).(*pub)
+       p.log = logger.GetLogger("pub")
+       p.handler = &mockHandler{}
+       return p
+}
+
+func getDataNode(name string, address string) schema.Metadata {
+       return schema.Metadata{
+               TypeMeta: schema.TypeMeta{
+                       Name: name,
+                       Kind: schema.KindNode,
+               },
+               Spec: &databasev1.Node{
+                       Metadata: &commonv1.Metadata{
+                               Name: name,
+                       },
+                       Roles:       
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+                       GrpcAddress: address,
+               },
+       }
+}
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
new file mode 100644
index 00000000..b68d19cc
--- /dev/null
+++ b/banyand/queue/pub/pub_test.go
@@ -0,0 +1,111 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc/codes"
+
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+var _ = ginkgo.Describe("publish clients register/unregister", func() {
+       var goods []gleak.Goroutine
+       ginkgo.BeforeEach(func() {
+               goods = gleak.Goroutines()
+       })
+       ginkgo.AfterEach(func() {
+               gomega.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       })
+
+       ginkgo.Context("publisher and batch publisher", func() {
+               ginkgo.It("should publish messages", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       bp := p.NewBatchPublisher()
+                       defer bp.Close()
+                       t := bus.UniTopic("test")
+                       for i := 0; i < 10; i++ {
+                               _, err := bp.Publish(t,
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", 
&streamv1.InternalWriteRequest{}),
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", 
&streamv1.InternalWriteRequest{}),
+                               )
+                               
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       }
+               })
+
+               ginkgo.FIt("should go to evict queue when node is unavailable", 
func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
+                       closeFn2 := setup(addr2, codes.Unavailable, 0)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       bp := p.NewBatchPublisher()
+                       t := bus.UniTopic("test")
+                       for i := 0; i < 10; i++ {
+                               _, err := bp.Publish(t,
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", 
&streamv1.InternalWriteRequest{}),
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", 
&streamv1.InternalWriteRequest{}),
+                               )
+                               
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       }
+                       
gomega.Expect(bp.Close()).ShouldNot(gomega.HaveOccurred())
+                       gomega.Eventually(func() int {
+                               p.mu.RLock()
+                               defer p.mu.RUnlock()
+                               return len(p.clients)
+                       }, flags.EventuallyTimeout).Should(gomega.Equal(1))
+                       func() {
+                               p.mu.RLock()
+                               defer p.mu.RUnlock()
+                               
gomega.Expect(p.evictClients).Should(gomega.HaveLen(1))
+                               
gomega.Expect(p.evictClients).Should(gomega.HaveKey("node2"))
+                               
gomega.Expect(p.clients).Should(gomega.HaveKey("node1"))
+                       }()
+               })
+       })
+})
diff --git a/pkg/grpchelper/client.go b/pkg/grpchelper/client.go
index 4e578774..3f815022 100644
--- a/pkg/grpchelper/client.go
+++ b/pkg/grpchelper/client.go
@@ -70,11 +70,11 @@ func Request(ctx context.Context, rpcTimeout time.Duration, 
fn func(rpcCtx conte
        err := fn(rpcCtx)
        if err != nil {
                if stat, ok := status.FromError(err); ok && stat.Code() == 
codes.Unimplemented {
-                       l.Warn().Str("stat", stat.Message()).Msg("error: this 
server does not implement the service")
+                       l.Info().Str("stat", stat.Message()).Msg("error: this 
server does not implement the service")
                } else if stat, ok := status.FromError(err); ok && stat.Code() 
== codes.DeadlineExceeded {
-                       l.Warn().Dur("rpcTimeout", rpcTimeout).Msg("timeout: 
rpc did not complete within")
+                       l.Info().Dur("rpcTimeout", rpcTimeout).Msg("timeout: 
rpc did not complete within")
                } else {
-                       l.Error().Err(err).Msg("error: rpc failed:")
+                       l.Info().Err(err).Msg("error: rpc failed:")
                }
                return err
        }
diff --git a/test/stress/istio/istio_suite_test.go 
b/test/stress/istio/istio_suite_test.go
index 5377b72e..a1945da0 100644
--- a/test/stress/istio/istio_suite_test.go
+++ b/test/stress/istio/istio_suite_test.go
@@ -66,10 +66,6 @@ var _ = g.Describe("Istio", func() {
                path, deferFn, err := test.NewSpace()
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                measurePath := filepath.Join(path, "measure")
-               g.DeferCleanup(func() {
-                       printDiskUsage(measurePath, 5, 0)
-                       deferFn()
-               })
                var ports []int
                ports, err = test.AllocateFreePorts(4)
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
@@ -80,6 +76,8 @@ var _ = g.Describe("Istio", func() {
                g.DeferCleanup(func() {
                        time.Sleep(time.Minute)
                        closerServerFunc()
+                       printDiskUsage(measurePath, 5, 0)
+                       deferFn()
                })
                gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second, 
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),
                        flags.EventuallyTimeout).Should(gomega.Succeed())

Reply via email to