This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch pub/perf in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 0eb6b7bd1815ecdfc602ba0c8ab03a63acaa0271 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Sep 4 20:05:42 2025 +0800 Refactor pub client by removing unused circuit breaker and retry logic --- banyand/queue/pub/circuitbreaker.go | 147 +++++++++++++++++++++ banyand/queue/pub/client.go | 254 +----------------------------------- banyand/queue/pub/pub_test.go | 2 +- banyand/queue/pub/retry.go | 161 +++++++++++++++++++++++ 4 files changed, 310 insertions(+), 254 deletions(-) diff --git a/banyand/queue/pub/circuitbreaker.go b/banyand/queue/pub/circuitbreaker.go new file mode 100644 index 00000000..7bb64e29 --- /dev/null +++ b/banyand/queue/pub/circuitbreaker.go @@ -0,0 +1,147 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pub + +import ( + "time" +) + +const ( + defaultCBThreshold = 5 + defaultCBResetTimeout = 60 * time.Second +) + +// CircuitState defines the circuit breaker states. +type CircuitState int + +// CircuitState defines the circuit breaker states. +const ( + StateClosed CircuitState = iota // Normal operation + StateOpen // Reject requests until cooldown expires + StateHalfOpen // Allow a single probe +) + +// circuitState holds circuit breaker metadata; it does NOT duplicate gRPC clients/conns. +type circuitState struct { + lastFailureTime time.Time + openTime time.Time + state CircuitState + consecutiveFailures int + halfOpenProbeInFlight bool +} + +// isRequestAllowed checks if a request to the given node is allowed based on circuit breaker state. +// It also handles state transitions from Open to Half-Open when cooldown expires. +func (p *pub) isRequestAllowed(node string) bool { + p.cbMu.Lock() + defer p.cbMu.Unlock() + + cb, exists := p.cbStates[node] + if !exists { + return true // No circuit breaker state, allow request + } + + switch cb.state { + case StateClosed: + return true + case StateOpen: + // Check if cooldown period has expired + if time.Since(cb.openTime) >= defaultCBResetTimeout { + // Transition to Half-Open to allow a single probe request + cb.state = StateHalfOpen + cb.halfOpenProbeInFlight = true // Set token for the probe + p.log.Info().Str("node", node).Msg("circuit breaker transitioned to half-open") + return true + } + return false // Still in cooldown period + case StateHalfOpen: + // In half-open state, deny requests if probe is already in flight + if cb.halfOpenProbeInFlight { + return false // Probe already in progress, deny additional requests + } + // This case should not normally happen since we set the token on transition, + // but handle it defensively by allowing the request and setting the token + cb.halfOpenProbeInFlight = true + return true + default: + return true + } +} + +// recordSuccess resets the circuit breaker state to Closed on successful operation. +// This handles Half-Open -> Closed transitions. +func (p *pub) recordSuccess(node string) { + p.cbMu.Lock() + defer p.cbMu.Unlock() + + cb, exists := p.cbStates[node] + if !exists { + // Initialize circuit breaker state + p.cbStates[node] = &circuitState{ + state: StateClosed, + consecutiveFailures: 0, + } + return + } + + // Reset to closed state + cb.state = StateClosed + cb.consecutiveFailures = 0 + cb.lastFailureTime = time.Time{} + cb.openTime = time.Time{} + cb.halfOpenProbeInFlight = false // Clear probe token +} + +// recordFailure updates the circuit breaker state on failed operation. +// Only records failures for transient/internal errors that should count toward opening the circuit. +func (p *pub) recordFailure(node string, err error) { + // Only record failure if the error is transient or internal + if !isTransientError(err) && !isInternalError(err) { + return + } + p.cbMu.Lock() + defer p.cbMu.Unlock() + + cb, exists := p.cbStates[node] + if !exists { + // Initialize circuit breaker state + cb = &circuitState{ + state: StateClosed, + consecutiveFailures: 1, + lastFailureTime: time.Now(), + } + p.cbStates[node] = cb + } else { + cb.consecutiveFailures++ + cb.lastFailureTime = time.Now() + } + + // Check if we should open the circuit + threshold := defaultCBThreshold + if cb.consecutiveFailures >= threshold && cb.state == StateClosed { + cb.state = StateOpen + cb.openTime = time.Now() + p.log.Warn().Str("node", node).Int("failures", cb.consecutiveFailures).Msg("circuit breaker opened") + } else if cb.state == StateHalfOpen { + // Failed during half-open, go back to open + cb.state = StateOpen + cb.openTime = time.Now() + cb.halfOpenProbeInFlight = false // Clear probe token + p.log.Warn().Str("node", node).Msg("circuit breaker reopened after half-open failure") + } +} diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index 74543ec4..bcca7868 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -19,16 +19,11 @@ package pub import ( "context" - "crypto/rand" - "errors" "fmt" - "math/big" "time" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" "github.com/apache/skywalking-banyandb/api/common" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" @@ -41,21 +36,10 @@ import ( ) const ( - rpcTimeout = 2 * time.Second - defaultJitterFactor = 0.2 - defaultMaxRetries = 3 - defaultCBThreshold = 5 - defaultCBResetTimeout = 60 * time.Second - defaultPerRequestTimeout = 2 * time.Second - defaultBackoffBase = 500 * time.Millisecond - defaultBackoffMax = 30 * time.Second + rpcTimeout = 2 * time.Second ) var ( - // Retry policy for health check. - initBackoff = time.Second - maxBackoff = 20 * time.Second - // The timeout is set by each RPC. retryPolicy = `{ "methodConfig": [ @@ -126,48 +110,8 @@ var ( } } ]}` - - // Retryable gRPC status codes for streaming send retries. - retryableCodes = map[codes.Code]bool{ - codes.OK: false, - codes.Canceled: false, - codes.Unknown: false, - codes.InvalidArgument: false, - codes.DeadlineExceeded: true, // Retryable - operation exceeded deadline - codes.NotFound: false, - codes.AlreadyExists: false, - codes.PermissionDenied: false, - codes.ResourceExhausted: true, // Retryable - server resource limits exceeded - codes.FailedPrecondition: false, - codes.Aborted: false, - codes.OutOfRange: false, - codes.Unimplemented: false, - codes.Internal: true, // Retryable - internal server error should participate in circuit breaker - codes.Unavailable: true, // Retryable - service temporarily unavailable - codes.DataLoss: false, - codes.Unauthenticated: false, - } -) - -// CircuitState defines the circuit breaker states. -type CircuitState int - -// CircuitState defines the circuit breaker states. -const ( - StateClosed CircuitState = iota // Normal operation - StateOpen // Reject requests until cooldown expires - StateHalfOpen // Allow a single probe ) -// circuitState holds circuit breaker metadata; it does NOT duplicate gRPC clients/conns. -type circuitState struct { - lastFailureTime time.Time - openTime time.Time - state CircuitState - consecutiveFailures int - halfOpenProbeInFlight bool -} - type client struct { client clusterv1.ServiceClient conn *grpc.ClientConn @@ -346,101 +290,6 @@ func (p *pub) removeNodeIfUnhealthy(md schema.Metadata, node *databasev1.Node, c return true } -// secureRandFloat64 generates a cryptographically secure random float64 in [0, 1). -func secureRandFloat64() float64 { - // Generate a random uint64 - maxVal := big.NewInt(1 << 53) // Use 53 bits for precision similar to math/rand - n, err := rand.Int(rand.Reader, maxVal) - if err != nil { - // Fallback to a reasonable value if crypto/rand fails - return 0.5 - } - return float64(n.Uint64()) / float64(1<<53) -} - -// jitteredBackoff calculates backoff duration with jitter to avoid thundering herds. -// Uses bounded symmetric jitter: backoff * (1 + jitter * (rand() - 0.5) * 2). -func jitteredBackoff(baseBackoff, maxBackoff time.Duration, attempt int, jitterFactor float64) time.Duration { - if jitterFactor < 0 { - jitterFactor = 0 - } - if jitterFactor > 1 { - jitterFactor = 1 - } - - // Exponential backoff: base * 2^attempt - backoff := baseBackoff - for i := 0; i < attempt; i++ { - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - break - } - } - - // Apply jitter: backoff * (1 + jitter * (rand() - 0.5) * 2) - // This gives us a range of [backoff * (1-jitter), backoff * (1+jitter)] - jitterRange := float64(backoff) * jitterFactor - randomFloat := secureRandFloat64() - randomOffset := (randomFloat - 0.5) * 2 * jitterRange - - jitteredDuration := time.Duration(float64(backoff) + randomOffset) - if jitteredDuration < 0 { - jitteredDuration = baseBackoff / 10 // Minimum backoff - } - if jitteredDuration > maxBackoff { - jitteredDuration = maxBackoff - } - - return jitteredDuration -} - -// isTransientError checks if the error is considered transient and retryable. -func isTransientError(err error) bool { - if err == nil { - return false - } - - // Handle gRPC status errors - if s, ok := status.FromError(err); ok { - return retryableCodes[s.Code()] - } - - // Handle common.Error types - var ce *common.Error - if errors.As(err, &ce) { - // Map common status to gRPC codes for consistency - switch ce.Status() { - case modelv1.Status_STATUS_INTERNAL_ERROR: - return retryableCodes[codes.Internal] - default: - return false - } - } - - return false -} - -// isInternalError checks if the error is an internal server error. -func isInternalError(err error) bool { - if err == nil { - return false - } - - // Handle gRPC status errors - if s, ok := status.FromError(err); ok { - return s.Code() == codes.Internal - } - - // Handle common.Error types - var ce *common.Error - if errors.As(err, &ce) { - return ce.Status() == modelv1.Status_STATUS_INTERNAL_ERROR - } - - return false -} - func (p *pub) checkClientHealthAndReconnect(conn *grpc.ClientConn, md schema.Metadata) bool { node, ok := md.Spec.(*databasev1.Node) if !ok { @@ -671,104 +520,3 @@ type evictNode struct { n *databasev1.Node c chan struct{} } - -// isRequestAllowed checks if a request to the given node is allowed based on circuit breaker state. -// It also handles state transitions from Open to Half-Open when cooldown expires. -func (p *pub) isRequestAllowed(node string) bool { - p.cbMu.Lock() - defer p.cbMu.Unlock() - - cb, exists := p.cbStates[node] - if !exists { - return true // No circuit breaker state, allow request - } - - switch cb.state { - case StateClosed: - return true - case StateOpen: - // Check if cooldown period has expired - if time.Since(cb.openTime) >= defaultCBResetTimeout { - // Transition to Half-Open to allow a single probe request - cb.state = StateHalfOpen - cb.halfOpenProbeInFlight = true // Set token for the probe - p.log.Info().Str("node", node).Msg("circuit breaker transitioned to half-open") - return true - } - return false // Still in cooldown period - case StateHalfOpen: - // In half-open state, deny requests if probe is already in flight - if cb.halfOpenProbeInFlight { - return false // Probe already in progress, deny additional requests - } - // This case should not normally happen since we set the token on transition, - // but handle it defensively by allowing the request and setting the token - cb.halfOpenProbeInFlight = true - return true - default: - return true - } -} - -// recordSuccess resets the circuit breaker state to Closed on successful operation. -// This handles Half-Open -> Closed transitions. -func (p *pub) recordSuccess(node string) { - p.cbMu.Lock() - defer p.cbMu.Unlock() - - cb, exists := p.cbStates[node] - if !exists { - // Initialize circuit breaker state - p.cbStates[node] = &circuitState{ - state: StateClosed, - consecutiveFailures: 0, - } - return - } - - // Reset to closed state - cb.state = StateClosed - cb.consecutiveFailures = 0 - cb.lastFailureTime = time.Time{} - cb.openTime = time.Time{} - cb.halfOpenProbeInFlight = false // Clear probe token -} - -// recordFailure updates the circuit breaker state on failed operation. -// Only records failures for transient/internal errors that should count toward opening the circuit. -func (p *pub) recordFailure(node string, err error) { - // Only record failure if the error is transient or internal - if !isTransientError(err) && !isInternalError(err) { - return - } - p.cbMu.Lock() - defer p.cbMu.Unlock() - - cb, exists := p.cbStates[node] - if !exists { - // Initialize circuit breaker state - cb = &circuitState{ - state: StateClosed, - consecutiveFailures: 1, - lastFailureTime: time.Now(), - } - p.cbStates[node] = cb - } else { - cb.consecutiveFailures++ - cb.lastFailureTime = time.Now() - } - - // Check if we should open the circuit - threshold := defaultCBThreshold - if cb.consecutiveFailures >= threshold && cb.state == StateClosed { - cb.state = StateOpen - cb.openTime = time.Now() - p.log.Warn().Str("node", node).Int("failures", cb.consecutiveFailures).Msg("circuit breaker opened") - } else if cb.state == StateHalfOpen { - // Failed during half-open, go back to open - cb.state = StateOpen - cb.openTime = time.Now() - cb.halfOpenProbeInFlight = false // Clear probe token - p.log.Warn().Str("node", node).Msg("circuit breaker reopened after half-open failure") - } -} diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go index 5e614831..d6c8237d 100644 --- a/banyand/queue/pub/pub_test.go +++ b/banyand/queue/pub/pub_test.go @@ -273,7 +273,7 @@ var _ = ginkgo.Describe("Publish and Broadcast", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - ff, err := p.Broadcast(15*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) + ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) gomega.Expect(ff).Should(gomega.HaveLen(2)) for i := range ff { diff --git a/banyand/queue/pub/retry.go b/banyand/queue/pub/retry.go new file mode 100644 index 00000000..1adfa0e2 --- /dev/null +++ b/banyand/queue/pub/retry.go @@ -0,0 +1,161 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pub + +import ( + "crypto/rand" + "errors" + "math/big" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" +) + +const ( + defaultJitterFactor = 0.2 + defaultMaxRetries = 3 + defaultPerRequestTimeout = 2 * time.Second + defaultBackoffBase = 500 * time.Millisecond + defaultBackoffMax = 30 * time.Second +) + +var ( + // Retry policy for health check. + initBackoff = time.Second + maxBackoff = 20 * time.Second + + // Retryable gRPC status codes for streaming send retries. + retryableCodes = map[codes.Code]bool{ + codes.OK: false, + codes.Canceled: false, + codes.Unknown: false, + codes.InvalidArgument: false, + codes.DeadlineExceeded: true, // Retryable - operation exceeded deadline + codes.NotFound: false, + codes.AlreadyExists: false, + codes.PermissionDenied: false, + codes.ResourceExhausted: true, // Retryable - server resource limits exceeded + codes.FailedPrecondition: false, + codes.Aborted: false, + codes.OutOfRange: false, + codes.Unimplemented: false, + codes.Internal: true, // Retryable - internal server error should participate in circuit breaker + codes.Unavailable: true, // Retryable - service temporarily unavailable + codes.DataLoss: false, + codes.Unauthenticated: false, + } +) + +// secureRandFloat64 generates a cryptographically secure random float64 in [0, 1). +func secureRandFloat64() float64 { + // Generate a random uint64 + maxVal := big.NewInt(1 << 53) // Use 53 bits for precision similar to math/rand + n, err := rand.Int(rand.Reader, maxVal) + if err != nil { + // Fallback to a reasonable value if crypto/rand fails + return 0.5 + } + return float64(n.Uint64()) / float64(1<<53) +} + +// jitteredBackoff calculates backoff duration with jitter to avoid thundering herds. +// Uses bounded symmetric jitter: backoff * (1 + jitter * (rand() - 0.5) * 2). +func jitteredBackoff(baseBackoff, maxBackoff time.Duration, attempt int, jitterFactor float64) time.Duration { + if jitterFactor < 0 { + jitterFactor = 0 + } + if jitterFactor > 1 { + jitterFactor = 1 + } + + // Exponential backoff: base * 2^attempt + backoff := baseBackoff + for i := 0; i < attempt; i++ { + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + break + } + } + + // Apply jitter: backoff * (1 + jitter * (rand() - 0.5) * 2) + // This gives us a range of [backoff * (1-jitter), backoff * (1+jitter)] + jitterRange := float64(backoff) * jitterFactor + randomFloat := secureRandFloat64() + randomOffset := (randomFloat - 0.5) * 2 * jitterRange + + jitteredDuration := time.Duration(float64(backoff) + randomOffset) + if jitteredDuration < 0 { + jitteredDuration = baseBackoff / 10 // Minimum backoff + } + if jitteredDuration > maxBackoff { + jitteredDuration = maxBackoff + } + + return jitteredDuration +} + +// isTransientError checks if the error is considered transient and retryable. +func isTransientError(err error) bool { + if err == nil { + return false + } + + // Handle gRPC status errors + if s, ok := status.FromError(err); ok { + return retryableCodes[s.Code()] + } + + // Handle common.Error types + var ce *common.Error + if errors.As(err, &ce) { + // Map common status to gRPC codes for consistency + switch ce.Status() { + case modelv1.Status_STATUS_INTERNAL_ERROR: + return retryableCodes[codes.Internal] + default: + return false + } + } + + return false +} + +// isInternalError checks if the error is an internal server error. +func isInternalError(err error) bool { + if err == nil { + return false + } + + // Handle gRPC status errors + if s, ok := status.FromError(err); ok { + return s.Code() == codes.Internal + } + + // Handle common.Error types + var ce *common.Error + if errors.As(err, &ce) { + return ce.Status() == modelv1.Status_STATUS_INTERNAL_ERROR + } + + return false +}