This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch pub/perf
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 0eb6b7bd1815ecdfc602ba0c8ab03a63acaa0271
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Sep 4 20:05:42 2025 +0800

    Refactor pub client by removing unused circuit breaker and retry logic
---
 banyand/queue/pub/circuitbreaker.go | 147 +++++++++++++++++++++
 banyand/queue/pub/client.go         | 254 +-----------------------------------
 banyand/queue/pub/pub_test.go       |   2 +-
 banyand/queue/pub/retry.go          | 161 +++++++++++++++++++++++
 4 files changed, 310 insertions(+), 254 deletions(-)

diff --git a/banyand/queue/pub/circuitbreaker.go 
b/banyand/queue/pub/circuitbreaker.go
new file mode 100644
index 00000000..7bb64e29
--- /dev/null
+++ b/banyand/queue/pub/circuitbreaker.go
@@ -0,0 +1,147 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "time"
+)
+
+const (
+       defaultCBThreshold    = 5
+       defaultCBResetTimeout = 60 * time.Second
+)
+
+// CircuitState defines the circuit breaker states.
+type CircuitState int
+
+// CircuitState defines the circuit breaker states.
+const (
+       StateClosed   CircuitState = iota // Normal operation
+       StateOpen                         // Reject requests until cooldown 
expires
+       StateHalfOpen                     // Allow a single probe
+)
+
+// circuitState holds circuit breaker metadata; it does NOT duplicate gRPC 
clients/conns.
+type circuitState struct {
+       lastFailureTime       time.Time
+       openTime              time.Time
+       state                 CircuitState
+       consecutiveFailures   int
+       halfOpenProbeInFlight bool
+}
+
+// isRequestAllowed checks if a request to the given node is allowed based on 
circuit breaker state.
+// It also handles state transitions from Open to Half-Open when cooldown 
expires.
+func (p *pub) isRequestAllowed(node string) bool {
+       p.cbMu.Lock()
+       defer p.cbMu.Unlock()
+
+       cb, exists := p.cbStates[node]
+       if !exists {
+               return true // No circuit breaker state, allow request
+       }
+
+       switch cb.state {
+       case StateClosed:
+               return true
+       case StateOpen:
+               // Check if cooldown period has expired
+               if time.Since(cb.openTime) >= defaultCBResetTimeout {
+                       // Transition to Half-Open to allow a single probe 
request
+                       cb.state = StateHalfOpen
+                       cb.halfOpenProbeInFlight = true // Set token for the 
probe
+                       p.log.Info().Str("node", node).Msg("circuit breaker 
transitioned to half-open")
+                       return true
+               }
+               return false // Still in cooldown period
+       case StateHalfOpen:
+               // In half-open state, deny requests if probe is already in 
flight
+               if cb.halfOpenProbeInFlight {
+                       return false // Probe already in progress, deny 
additional requests
+               }
+               // This case should not normally happen since we set the token 
on transition,
+               // but handle it defensively by allowing the request and 
setting the token
+               cb.halfOpenProbeInFlight = true
+               return true
+       default:
+               return true
+       }
+}
+
+// recordSuccess resets the circuit breaker state to Closed on successful 
operation.
+// This handles Half-Open -> Closed transitions.
+func (p *pub) recordSuccess(node string) {
+       p.cbMu.Lock()
+       defer p.cbMu.Unlock()
+
+       cb, exists := p.cbStates[node]
+       if !exists {
+               // Initialize circuit breaker state
+               p.cbStates[node] = &circuitState{
+                       state:               StateClosed,
+                       consecutiveFailures: 0,
+               }
+               return
+       }
+
+       // Reset to closed state
+       cb.state = StateClosed
+       cb.consecutiveFailures = 0
+       cb.lastFailureTime = time.Time{}
+       cb.openTime = time.Time{}
+       cb.halfOpenProbeInFlight = false // Clear probe token
+}
+
+// recordFailure updates the circuit breaker state on failed operation.
+// Only records failures for transient/internal errors that should count 
toward opening the circuit.
+func (p *pub) recordFailure(node string, err error) {
+       // Only record failure if the error is transient or internal
+       if !isTransientError(err) && !isInternalError(err) {
+               return
+       }
+       p.cbMu.Lock()
+       defer p.cbMu.Unlock()
+
+       cb, exists := p.cbStates[node]
+       if !exists {
+               // Initialize circuit breaker state
+               cb = &circuitState{
+                       state:               StateClosed,
+                       consecutiveFailures: 1,
+                       lastFailureTime:     time.Now(),
+               }
+               p.cbStates[node] = cb
+       } else {
+               cb.consecutiveFailures++
+               cb.lastFailureTime = time.Now()
+       }
+
+       // Check if we should open the circuit
+       threshold := defaultCBThreshold
+       if cb.consecutiveFailures >= threshold && cb.state == StateClosed {
+               cb.state = StateOpen
+               cb.openTime = time.Now()
+               p.log.Warn().Str("node", node).Int("failures", 
cb.consecutiveFailures).Msg("circuit breaker opened")
+       } else if cb.state == StateHalfOpen {
+               // Failed during half-open, go back to open
+               cb.state = StateOpen
+               cb.openTime = time.Now()
+               cb.halfOpenProbeInFlight = false // Clear probe token
+               p.log.Warn().Str("node", node).Msg("circuit breaker reopened 
after half-open failure")
+       }
+}
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index 74543ec4..bcca7868 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -19,16 +19,11 @@ package pub
 
 import (
        "context"
-       "crypto/rand"
-       "errors"
        "fmt"
-       "math/big"
        "time"
 
        "google.golang.org/grpc"
-       "google.golang.org/grpc/codes"
        "google.golang.org/grpc/health/grpc_health_v1"
-       "google.golang.org/grpc/status"
 
        "github.com/apache/skywalking-banyandb/api/common"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
@@ -41,21 +36,10 @@ import (
 )
 
 const (
-       rpcTimeout               = 2 * time.Second
-       defaultJitterFactor      = 0.2
-       defaultMaxRetries        = 3
-       defaultCBThreshold       = 5
-       defaultCBResetTimeout    = 60 * time.Second
-       defaultPerRequestTimeout = 2 * time.Second
-       defaultBackoffBase       = 500 * time.Millisecond
-       defaultBackoffMax        = 30 * time.Second
+       rpcTimeout = 2 * time.Second
 )
 
 var (
-       // Retry policy for health check.
-       initBackoff = time.Second
-       maxBackoff  = 20 * time.Second
-
        // The timeout is set by each RPC.
        retryPolicy = `{
        "methodConfig": [
@@ -126,48 +110,8 @@ var (
            }
          }
        ]}`
-
-       // Retryable gRPC status codes for streaming send retries.
-       retryableCodes = map[codes.Code]bool{
-               codes.OK:                 false,
-               codes.Canceled:           false,
-               codes.Unknown:            false,
-               codes.InvalidArgument:    false,
-               codes.DeadlineExceeded:   true, // Retryable - operation 
exceeded deadline
-               codes.NotFound:           false,
-               codes.AlreadyExists:      false,
-               codes.PermissionDenied:   false,
-               codes.ResourceExhausted:  true, // Retryable - server resource 
limits exceeded
-               codes.FailedPrecondition: false,
-               codes.Aborted:            false,
-               codes.OutOfRange:         false,
-               codes.Unimplemented:      false,
-               codes.Internal:           true, // Retryable - internal server 
error should participate in circuit breaker
-               codes.Unavailable:        true, // Retryable - service 
temporarily unavailable
-               codes.DataLoss:           false,
-               codes.Unauthenticated:    false,
-       }
-)
-
-// CircuitState defines the circuit breaker states.
-type CircuitState int
-
-// CircuitState defines the circuit breaker states.
-const (
-       StateClosed   CircuitState = iota // Normal operation
-       StateOpen                         // Reject requests until cooldown 
expires
-       StateHalfOpen                     // Allow a single probe
 )
 
-// circuitState holds circuit breaker metadata; it does NOT duplicate gRPC 
clients/conns.
-type circuitState struct {
-       lastFailureTime       time.Time
-       openTime              time.Time
-       state                 CircuitState
-       consecutiveFailures   int
-       halfOpenProbeInFlight bool
-}
-
 type client struct {
        client clusterv1.ServiceClient
        conn   *grpc.ClientConn
@@ -346,101 +290,6 @@ func (p *pub) removeNodeIfUnhealthy(md schema.Metadata, 
node *databasev1.Node, c
        return true
 }
 
-// secureRandFloat64 generates a cryptographically secure random float64 in 
[0, 1).
-func secureRandFloat64() float64 {
-       // Generate a random uint64
-       maxVal := big.NewInt(1 << 53) // Use 53 bits for precision similar to 
math/rand
-       n, err := rand.Int(rand.Reader, maxVal)
-       if err != nil {
-               // Fallback to a reasonable value if crypto/rand fails
-               return 0.5
-       }
-       return float64(n.Uint64()) / float64(1<<53)
-}
-
-// jitteredBackoff calculates backoff duration with jitter to avoid thundering 
herds.
-// Uses bounded symmetric jitter: backoff * (1 + jitter * (rand() - 0.5) * 2).
-func jitteredBackoff(baseBackoff, maxBackoff time.Duration, attempt int, 
jitterFactor float64) time.Duration {
-       if jitterFactor < 0 {
-               jitterFactor = 0
-       }
-       if jitterFactor > 1 {
-               jitterFactor = 1
-       }
-
-       // Exponential backoff: base * 2^attempt
-       backoff := baseBackoff
-       for i := 0; i < attempt; i++ {
-               backoff *= 2
-               if backoff > maxBackoff {
-                       backoff = maxBackoff
-                       break
-               }
-       }
-
-       // Apply jitter: backoff * (1 + jitter * (rand() - 0.5) * 2)
-       // This gives us a range of [backoff * (1-jitter), backoff * (1+jitter)]
-       jitterRange := float64(backoff) * jitterFactor
-       randomFloat := secureRandFloat64()
-       randomOffset := (randomFloat - 0.5) * 2 * jitterRange
-
-       jitteredDuration := time.Duration(float64(backoff) + randomOffset)
-       if jitteredDuration < 0 {
-               jitteredDuration = baseBackoff / 10 // Minimum backoff
-       }
-       if jitteredDuration > maxBackoff {
-               jitteredDuration = maxBackoff
-       }
-
-       return jitteredDuration
-}
-
-// isTransientError checks if the error is considered transient and retryable.
-func isTransientError(err error) bool {
-       if err == nil {
-               return false
-       }
-
-       // Handle gRPC status errors
-       if s, ok := status.FromError(err); ok {
-               return retryableCodes[s.Code()]
-       }
-
-       // Handle common.Error types
-       var ce *common.Error
-       if errors.As(err, &ce) {
-               // Map common status to gRPC codes for consistency
-               switch ce.Status() {
-               case modelv1.Status_STATUS_INTERNAL_ERROR:
-                       return retryableCodes[codes.Internal]
-               default:
-                       return false
-               }
-       }
-
-       return false
-}
-
-// isInternalError checks if the error is an internal server error.
-func isInternalError(err error) bool {
-       if err == nil {
-               return false
-       }
-
-       // Handle gRPC status errors
-       if s, ok := status.FromError(err); ok {
-               return s.Code() == codes.Internal
-       }
-
-       // Handle common.Error types
-       var ce *common.Error
-       if errors.As(err, &ce) {
-               return ce.Status() == modelv1.Status_STATUS_INTERNAL_ERROR
-       }
-
-       return false
-}
-
 func (p *pub) checkClientHealthAndReconnect(conn *grpc.ClientConn, md 
schema.Metadata) bool {
        node, ok := md.Spec.(*databasev1.Node)
        if !ok {
@@ -671,104 +520,3 @@ type evictNode struct {
        n *databasev1.Node
        c chan struct{}
 }
-
-// isRequestAllowed checks if a request to the given node is allowed based on 
circuit breaker state.
-// It also handles state transitions from Open to Half-Open when cooldown 
expires.
-func (p *pub) isRequestAllowed(node string) bool {
-       p.cbMu.Lock()
-       defer p.cbMu.Unlock()
-
-       cb, exists := p.cbStates[node]
-       if !exists {
-               return true // No circuit breaker state, allow request
-       }
-
-       switch cb.state {
-       case StateClosed:
-               return true
-       case StateOpen:
-               // Check if cooldown period has expired
-               if time.Since(cb.openTime) >= defaultCBResetTimeout {
-                       // Transition to Half-Open to allow a single probe 
request
-                       cb.state = StateHalfOpen
-                       cb.halfOpenProbeInFlight = true // Set token for the 
probe
-                       p.log.Info().Str("node", node).Msg("circuit breaker 
transitioned to half-open")
-                       return true
-               }
-               return false // Still in cooldown period
-       case StateHalfOpen:
-               // In half-open state, deny requests if probe is already in 
flight
-               if cb.halfOpenProbeInFlight {
-                       return false // Probe already in progress, deny 
additional requests
-               }
-               // This case should not normally happen since we set the token 
on transition,
-               // but handle it defensively by allowing the request and 
setting the token
-               cb.halfOpenProbeInFlight = true
-               return true
-       default:
-               return true
-       }
-}
-
-// recordSuccess resets the circuit breaker state to Closed on successful 
operation.
-// This handles Half-Open -> Closed transitions.
-func (p *pub) recordSuccess(node string) {
-       p.cbMu.Lock()
-       defer p.cbMu.Unlock()
-
-       cb, exists := p.cbStates[node]
-       if !exists {
-               // Initialize circuit breaker state
-               p.cbStates[node] = &circuitState{
-                       state:               StateClosed,
-                       consecutiveFailures: 0,
-               }
-               return
-       }
-
-       // Reset to closed state
-       cb.state = StateClosed
-       cb.consecutiveFailures = 0
-       cb.lastFailureTime = time.Time{}
-       cb.openTime = time.Time{}
-       cb.halfOpenProbeInFlight = false // Clear probe token
-}
-
-// recordFailure updates the circuit breaker state on failed operation.
-// Only records failures for transient/internal errors that should count 
toward opening the circuit.
-func (p *pub) recordFailure(node string, err error) {
-       // Only record failure if the error is transient or internal
-       if !isTransientError(err) && !isInternalError(err) {
-               return
-       }
-       p.cbMu.Lock()
-       defer p.cbMu.Unlock()
-
-       cb, exists := p.cbStates[node]
-       if !exists {
-               // Initialize circuit breaker state
-               cb = &circuitState{
-                       state:               StateClosed,
-                       consecutiveFailures: 1,
-                       lastFailureTime:     time.Now(),
-               }
-               p.cbStates[node] = cb
-       } else {
-               cb.consecutiveFailures++
-               cb.lastFailureTime = time.Now()
-       }
-
-       // Check if we should open the circuit
-       threshold := defaultCBThreshold
-       if cb.consecutiveFailures >= threshold && cb.state == StateClosed {
-               cb.state = StateOpen
-               cb.openTime = time.Now()
-               p.log.Warn().Str("node", node).Int("failures", 
cb.consecutiveFailures).Msg("circuit breaker opened")
-       } else if cb.state == StateHalfOpen {
-               // Failed during half-open, go back to open
-               cb.state = StateOpen
-               cb.openTime = time.Now()
-               cb.halfOpenProbeInFlight = false // Clear probe token
-               p.log.Warn().Str("node", node).Msg("circuit breaker reopened 
after half-open failure")
-       }
-}
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
index 5e614831..d6c8237d 100644
--- a/banyand/queue/pub/pub_test.go
+++ b/banyand/queue/pub/pub_test.go
@@ -273,7 +273,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       ff, err := p.Broadcast(15*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
+                       ff, err := p.Broadcast(3*time.Second, 
data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), 
&streamv1.QueryRequest{}))
                        gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                        gomega.Expect(ff).Should(gomega.HaveLen(2))
                        for i := range ff {
diff --git a/banyand/queue/pub/retry.go b/banyand/queue/pub/retry.go
new file mode 100644
index 00000000..1adfa0e2
--- /dev/null
+++ b/banyand/queue/pub/retry.go
@@ -0,0 +1,161 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pub
+
+import (
+       "crypto/rand"
+       "errors"
+       "math/big"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+)
+
+const (
+       defaultJitterFactor      = 0.2
+       defaultMaxRetries        = 3
+       defaultPerRequestTimeout = 2 * time.Second
+       defaultBackoffBase       = 500 * time.Millisecond
+       defaultBackoffMax        = 30 * time.Second
+)
+
+var (
+       // Retry policy for health check.
+       initBackoff = time.Second
+       maxBackoff  = 20 * time.Second
+
+       // Retryable gRPC status codes for streaming send retries.
+       retryableCodes = map[codes.Code]bool{
+               codes.OK:                 false,
+               codes.Canceled:           false,
+               codes.Unknown:            false,
+               codes.InvalidArgument:    false,
+               codes.DeadlineExceeded:   true, // Retryable - operation 
exceeded deadline
+               codes.NotFound:           false,
+               codes.AlreadyExists:      false,
+               codes.PermissionDenied:   false,
+               codes.ResourceExhausted:  true, // Retryable - server resource 
limits exceeded
+               codes.FailedPrecondition: false,
+               codes.Aborted:            false,
+               codes.OutOfRange:         false,
+               codes.Unimplemented:      false,
+               codes.Internal:           true, // Retryable - internal server 
error should participate in circuit breaker
+               codes.Unavailable:        true, // Retryable - service 
temporarily unavailable
+               codes.DataLoss:           false,
+               codes.Unauthenticated:    false,
+       }
+)
+
+// secureRandFloat64 generates a cryptographically secure random float64 in 
[0, 1).
+func secureRandFloat64() float64 {
+       // Generate a random uint64
+       maxVal := big.NewInt(1 << 53) // Use 53 bits for precision similar to 
math/rand
+       n, err := rand.Int(rand.Reader, maxVal)
+       if err != nil {
+               // Fallback to a reasonable value if crypto/rand fails
+               return 0.5
+       }
+       return float64(n.Uint64()) / float64(1<<53)
+}
+
+// jitteredBackoff calculates backoff duration with jitter to avoid thundering 
herds.
+// Uses bounded symmetric jitter: backoff * (1 + jitter * (rand() - 0.5) * 2).
+func jitteredBackoff(baseBackoff, maxBackoff time.Duration, attempt int, 
jitterFactor float64) time.Duration {
+       if jitterFactor < 0 {
+               jitterFactor = 0
+       }
+       if jitterFactor > 1 {
+               jitterFactor = 1
+       }
+
+       // Exponential backoff: base * 2^attempt
+       backoff := baseBackoff
+       for i := 0; i < attempt; i++ {
+               backoff *= 2
+               if backoff > maxBackoff {
+                       backoff = maxBackoff
+                       break
+               }
+       }
+
+       // Apply jitter: backoff * (1 + jitter * (rand() - 0.5) * 2)
+       // This gives us a range of [backoff * (1-jitter), backoff * (1+jitter)]
+       jitterRange := float64(backoff) * jitterFactor
+       randomFloat := secureRandFloat64()
+       randomOffset := (randomFloat - 0.5) * 2 * jitterRange
+
+       jitteredDuration := time.Duration(float64(backoff) + randomOffset)
+       if jitteredDuration < 0 {
+               jitteredDuration = baseBackoff / 10 // Minimum backoff
+       }
+       if jitteredDuration > maxBackoff {
+               jitteredDuration = maxBackoff
+       }
+
+       return jitteredDuration
+}
+
+// isTransientError checks if the error is considered transient and retryable.
+func isTransientError(err error) bool {
+       if err == nil {
+               return false
+       }
+
+       // Handle gRPC status errors
+       if s, ok := status.FromError(err); ok {
+               return retryableCodes[s.Code()]
+       }
+
+       // Handle common.Error types
+       var ce *common.Error
+       if errors.As(err, &ce) {
+               // Map common status to gRPC codes for consistency
+               switch ce.Status() {
+               case modelv1.Status_STATUS_INTERNAL_ERROR:
+                       return retryableCodes[codes.Internal]
+               default:
+                       return false
+               }
+       }
+
+       return false
+}
+
+// isInternalError checks if the error is an internal server error.
+func isInternalError(err error) bool {
+       if err == nil {
+               return false
+       }
+
+       // Handle gRPC status errors
+       if s, ok := status.FromError(err); ok {
+               return s.Code() == codes.Internal
+       }
+
+       // Handle common.Error types
+       var ce *common.Error
+       if errors.As(err, &ce) {
+               return ce.Status() == modelv1.Status_STATUS_INTERNAL_ERROR
+       }
+
+       return false
+}

Reply via email to