This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch pub/perf
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 110fd09440a0fc094863d67884e6847afe0e74bc
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Sep 4 19:45:50 2025 +0800

    Refactor retry policy handling in pub client
    
    - Updated the pub client to use an instance-specific retry policy instead 
of a global one, enhancing flexibility in error handling.
    - Introduced a new function to create a pub instance with a custom retry 
policy that does not retry on Unavailable errors, improving test coverage and 
behavior during failure scenarios.
    - Adjusted tests to utilize the new pub instance creation method, ensuring 
consistent behavior in retry logic during message publishing.
---
 banyand/queue/pub/client.go         |  4 ++--
 banyand/queue/pub/pub.go            | 10 ++++++----
 banyand/queue/pub/pub_suite_test.go | 28 ++++++++++++++++++++++++++++
 banyand/queue/pub/pub_test.go       | 20 ++++++++++----------
 4 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 482df7a0..74543ec4 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -225,7 +225,7 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
                p.log.Error().Err(err).Msg("failed to load client TLS 
credentials")
                return
        }
-       conn, err := grpc.NewClient(address, append(credOpts, 
grpc.WithDefaultServiceConfig(retryPolicy))...)
+       conn, err := grpc.NewClient(address, append(credOpts, 
grpc.WithDefaultServiceConfig(p.retryPolicy))...)
        if err != nil {
                p.log.Error().Err(err).Msg("failed to connect to grpc server")
                return
@@ -469,7 +469,7 @@ func (p *pub) checkClientHealthAndReconnect(conn 
*grpc.ClientConn, md schema.Met
                                        p.log.Error().Err(errEvict).Msg("failed 
to load client TLS credentials (evict)")
                                        return
                                }
-                               connEvict, errEvict := 
grpc.NewClient(node.GrpcAddress, append(credOpts, 
grpc.WithDefaultServiceConfig(retryPolicy))...)
+                               connEvict, errEvict := 
grpc.NewClient(node.GrpcAddress, append(credOpts, 
grpc.WithDefaultServiceConfig(p.retryPolicy))...)
                                if errEvict == nil && 
p.healthCheck(en.n.String(), connEvict) {
                                        func() {
                                                p.mu.Lock()
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index b7c17df0..7a40046e 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -64,16 +64,17 @@ var (
 type pub struct {
        schema.UnimplementedOnInitHandler
        metadata        metadata.Repo
-       writableProbe   map[string]map[string]struct{}
-       cbStates        map[string]*circuitState
+       handlers        map[bus.Topic]schema.EventHandler
+       log             *logger.Logger
        registered      map[string]*databasev1.Node
        active          map[string]*client
        evictable       map[string]evictNode
        closer          *run.Closer
-       handlers        map[bus.Topic]schema.EventHandler
-       log             *logger.Logger
+       writableProbe   map[string]map[string]struct{}
+       cbStates        map[string]*circuitState
        caCertPath      string
        prefix          string
+       retryPolicy     string
        allowedRoles    []databasev1.Role
        mu              sync.RWMutex
        cbMu            sync.RWMutex
@@ -305,6 +306,7 @@ func New(metadata metadata.Repo, roles ...databasev1.Role) 
queue.Client {
                prefix:        strBuilder.String(),
                writableProbe: make(map[string]map[string]struct{}),
                cbStates:      make(map[string]*circuitState),
+               retryPolicy:   retryPolicy,
        }
        return p
 }
diff --git a/banyand/queue/pub/pub_suite_test.go 
b/banyand/queue/pub/pub_suite_test.go
index bb5c0f7d..fab13eb4 100644
--- a/banyand/queue/pub/pub_suite_test.go
+++ b/banyand/queue/pub/pub_suite_test.go
@@ -222,6 +222,34 @@ func newPub(roles ...databasev1.Role) *pub {
        return p.(*pub)
 }
 
+// newPubWithNoRetry creates a pub with a retry policy that doesn't retry 
Unavailable errors.
+func newPubWithNoRetry(roles ...databasev1.Role) *pub {
+       p := New(nil, roles...)
+       p.(*pub).log = logger.GetLogger("queue-client")
+       p.Register(data.TopicStreamWrite, &mockHandler{})
+       p.Register(data.TopicMeasureWrite, &mockHandler{})
+
+       // Override the retry policy to not retry Unavailable errors
+       noRetryPolicy := `{
+               "methodConfig": [
+                 {
+                   "name": [{"service": "banyandb.cluster.v1.Service"}],
+                   "waitForReady": true,
+                   "retryPolicy": {
+                       "MaxAttempts": 2,
+                       "InitialBackoff": ".5s",
+                       "MaxBackoff": "10s",
+                       "BackoffMultiplier": 2.0,
+                       "RetryableStatusCodes": [ "DEADLINE_EXCEEDED", 
"RESOURCE_EXHAUSTED" ]
+                   }
+                 }
+               ]}`
+
+       // Store the original retry policy and replace it
+       p.(*pub).retryPolicy = noRetryPolicy
+       return p.(*pub)
+}
+
 func getDataNode(name string, address string) schema.Metadata {
        return schema.Metadata{
                TypeMeta: schema.TypeMeta{
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
index a8f4251b..5e614831 100644
--- a/banyand/queue/pub/pub_test.go
+++ b/banyand/queue/pub/pub_test.go
@@ -64,7 +64,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       bp := p.NewBatchPublisher(3 * time.Second)
+                       bp := p.NewBatchPublisher(15 * time.Second)
                        ctx := context.TODO()
                        for i := 0; i < 10; i++ {
                                _, err := bp.Publish(ctx, data.TopicStreamWrite,
@@ -94,7 +94,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       bp := p.NewBatchPublisher(3 * time.Second)
+                       bp := p.NewBatchPublisher(15 * time.Second)
                        ctx := context.TODO()
                        for i := 0; i < 10; i++ {
                                _, err := bp.Publish(ctx, data.TopicStreamWrite,
@@ -141,7 +141,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       bp := p.NewBatchPublisher(3 * time.Second)
+                       bp := p.NewBatchPublisher(15 * time.Second)
                        ctx := context.TODO()
                        for i := 0; i < 10; i++ {
                                _, err := bp.Publish(ctx, data.TopicStreamWrite,
@@ -178,7 +178,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       bp := p.NewBatchPublisher(3 * time.Second)
+                       bp := p.NewBatchPublisher(15 * time.Second)
                        ctx := context.TODO()
                        for i := 0; i < 10; i++ {
                                _, err := bp.Publish(ctx, data.TopicStreamWrite,
@@ -217,7 +217,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        p.OnAddOrUpdate(node2)
 
                        req := &streamv1.QueryRequest{}
-                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), req))
+                       ff, err := p.Broadcast(15*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), req))
                        gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                        gomega.Expect(ff).Should(gomega.HaveLen(2))
                        messages, err := ff[0].GetAll()
@@ -230,7 +230,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        addr2 := getAddress()
                        closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
                        closeFn2 := setup(addr2, codes.Unavailable, 0)
-                       p := newPub()
+                       p := newPubWithNoRetry()
                        defer func() {
                                p.GracefulStop()
                                closeFn1()
@@ -241,7 +241,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
+                       ff, err := p.Broadcast(15*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
                        gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                        gomega.Expect(ff).Should(gomega.HaveLen(2))
                        for i := range ff {
@@ -273,7 +273,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
+                       ff, err := p.Broadcast(15*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
                        gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                        gomega.Expect(ff).Should(gomega.HaveLen(2))
                        for i := range ff {
@@ -326,7 +326,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                                group1: {""},
                        }
 
-                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery,
+                       ff, err := p.Broadcast(15*time.Second, 
data.TopicStreamQuery,
                                
bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors, timeRange, 
&streamv1.QueryRequest{}))
 
                        gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
@@ -377,7 +377,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                                group1: {"role=ingest"},
                        }
 
-                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery,
+                       ff, err := p.Broadcast(15*time.Second, 
data.TopicStreamQuery,
                                
bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors, timeRange, 
&streamv1.QueryRequest{}))
 
                        gomega.Expect(err).ShouldNot(gomega.HaveOccurred())

Reply via email to