This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch pub/perf in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 110fd09440a0fc094863d67884e6847afe0e74bc Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Sep 4 19:45:50 2025 +0800 Refactor retry policy handling in pub client - Updated the pub client to use an instance-specific retry policy instead of a global one, enhancing flexibility in error handling. - Introduced a new function to create a pub instance with a custom retry policy that does not retry on Unavailable errors, improving test coverage and behavior during failure scenarios. - Adjusted tests to utilize the new pub instance creation method, ensuring consistent behavior in retry logic during message publishing. --- banyand/queue/pub/client.go | 4 ++-- banyand/queue/pub/pub.go | 10 ++++++---- banyand/queue/pub/pub_suite_test.go | 28 ++++++++++++++++++++++++++++ banyand/queue/pub/pub_test.go | 20 ++++++++++---------- 4 files changed, 46 insertions(+), 16 deletions(-) diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index 482df7a0..74543ec4 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -225,7 +225,7 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { p.log.Error().Err(err).Msg("failed to load client TLS credentials") return } - conn, err := grpc.NewClient(address, append(credOpts, grpc.WithDefaultServiceConfig(retryPolicy))...) + conn, err := grpc.NewClient(address, append(credOpts, grpc.WithDefaultServiceConfig(p.retryPolicy))...) if err != nil { p.log.Error().Err(err).Msg("failed to connect to grpc server") return @@ -469,7 +469,7 @@ func (p *pub) checkClientHealthAndReconnect(conn *grpc.ClientConn, md schema.Met p.log.Error().Err(errEvict).Msg("failed to load client TLS credentials (evict)") return } - connEvict, errEvict := grpc.NewClient(node.GrpcAddress, append(credOpts, grpc.WithDefaultServiceConfig(retryPolicy))...) + connEvict, errEvict := grpc.NewClient(node.GrpcAddress, append(credOpts, grpc.WithDefaultServiceConfig(p.retryPolicy))...) if errEvict == nil && p.healthCheck(en.n.String(), connEvict) { func() { p.mu.Lock() diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index b7c17df0..7a40046e 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -64,16 +64,17 @@ var ( type pub struct { schema.UnimplementedOnInitHandler metadata metadata.Repo - writableProbe map[string]map[string]struct{} - cbStates map[string]*circuitState + handlers map[bus.Topic]schema.EventHandler + log *logger.Logger registered map[string]*databasev1.Node active map[string]*client evictable map[string]evictNode closer *run.Closer - handlers map[bus.Topic]schema.EventHandler - log *logger.Logger + writableProbe map[string]map[string]struct{} + cbStates map[string]*circuitState caCertPath string prefix string + retryPolicy string allowedRoles []databasev1.Role mu sync.RWMutex cbMu sync.RWMutex @@ -305,6 +306,7 @@ func New(metadata metadata.Repo, roles ...databasev1.Role) queue.Client { prefix: strBuilder.String(), writableProbe: make(map[string]map[string]struct{}), cbStates: make(map[string]*circuitState), + retryPolicy: retryPolicy, } return p } diff --git a/banyand/queue/pub/pub_suite_test.go b/banyand/queue/pub/pub_suite_test.go index bb5c0f7d..fab13eb4 100644 --- a/banyand/queue/pub/pub_suite_test.go +++ b/banyand/queue/pub/pub_suite_test.go @@ -222,6 +222,34 @@ func newPub(roles ...databasev1.Role) *pub { return p.(*pub) } +// newPubWithNoRetry creates a pub with a retry policy that doesn't retry Unavailable errors. +func newPubWithNoRetry(roles ...databasev1.Role) *pub { + p := New(nil, roles...) + p.(*pub).log = logger.GetLogger("queue-client") + p.Register(data.TopicStreamWrite, &mockHandler{}) + p.Register(data.TopicMeasureWrite, &mockHandler{}) + + // Override the retry policy to not retry Unavailable errors + noRetryPolicy := `{ + "methodConfig": [ + { + "name": [{"service": "banyandb.cluster.v1.Service"}], + "waitForReady": true, + "retryPolicy": { + "MaxAttempts": 2, + "InitialBackoff": ".5s", + "MaxBackoff": "10s", + "BackoffMultiplier": 2.0, + "RetryableStatusCodes": [ "DEADLINE_EXCEEDED", "RESOURCE_EXHAUSTED" ] + } + } + ]}` + + // Store the original retry policy and replace it + p.(*pub).retryPolicy = noRetryPolicy + return p.(*pub) +} + func getDataNode(name string, address string) schema.Metadata { return schema.Metadata{ TypeMeta: schema.TypeMeta{ diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go index a8f4251b..5e614831 100644 --- a/banyand/queue/pub/pub_test.go +++ b/banyand/queue/pub/pub_test.go @@ -64,7 +64,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - bp := p.NewBatchPublisher(3 * time.Second) + bp := p.NewBatchPublisher(15 * time.Second) ctx := context.TODO() for i := 0; i < 10; i++ { _, err := bp.Publish(ctx, data.TopicStreamWrite, @@ -94,7 +94,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - bp := p.NewBatchPublisher(3 * time.Second) + bp := p.NewBatchPublisher(15 * time.Second) ctx := context.TODO() for i := 0; i < 10; i++ { _, err := bp.Publish(ctx, data.TopicStreamWrite, @@ -141,7 +141,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - bp := p.NewBatchPublisher(3 * time.Second) + bp := p.NewBatchPublisher(15 * time.Second) ctx := context.TODO() for i := 0; i < 10; i++ { _, err := bp.Publish(ctx, data.TopicStreamWrite, @@ -178,7 +178,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - bp := p.NewBatchPublisher(3 * time.Second) + bp := p.NewBatchPublisher(15 * time.Second) ctx := context.TODO() for i := 0; i < 10; i++ { _, err := bp.Publish(ctx, data.TopicStreamWrite, @@ -217,7 +217,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { p.OnAddOrUpdate(node2) req := &streamv1.QueryRequest{} - ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), req)) + ff, err := p.Broadcast(15*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), req)) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) gomega.Expect(ff).Should(gomega.HaveLen(2)) messages, err := ff[0].GetAll() @@ -230,7 +230,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { addr2 := getAddress() closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) closeFn2 := setup(addr2, codes.Unavailable, 0) - p := newPub() + p := newPubWithNoRetry() defer func() { p.GracefulStop() closeFn1() @@ -241,7 +241,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) + ff, err := p.Broadcast(15*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) gomega.Expect(ff).Should(gomega.HaveLen(2)) for i := range ff { @@ -273,7 +273,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) + ff, err := p.Broadcast(15*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) gomega.Expect(ff).Should(gomega.HaveLen(2)) for i := range ff { @@ -326,7 +326,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { group1: {""}, } - ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, + ff, err := p.Broadcast(15*time.Second, data.TopicStreamQuery, bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors, timeRange, &streamv1.QueryRequest{})) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) @@ -377,7 +377,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { group1: {"role=ingest"}, } - ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, + ff, err := p.Broadcast(15*time.Second, data.TopicStreamQuery, bus.NewMessageWithNodeSelectors(bus.MessageID(1), nodeSelectors, timeRange, &streamv1.QueryRequest{})) gomega.Expect(err).ShouldNot(gomega.HaveOccurred())