This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch pub/perf in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit d22ec2fc4081960f7c2f0f89ab39d6362d8bf43b Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Sep 4 04:36:49 2025 +0800 Add circuit breaker implementation for client health management - Introduced a circuit breaker mechanism to manage client health checks, allowing for better handling of failures and recovery. - Added `CircuitState` type and associated logic to track the state of each client connection. - Implemented methods to record successes and failures, adjusting the circuit state accordingly. - Enhanced the `isRequestAllowed` function to determine request eligibility based on the circuit breaker state. - Updated the `pub` struct to include a map for circuit breaker states, improving resilience in client operations. --- banyand/queue/pub/client.go | 103 ++++++++++++++++++++++++++++++++++++++++++++ banyand/queue/pub/pub.go | 9 ++-- 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index a0a68a6e..de22b497 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -60,6 +60,23 @@ var ( }]}`, serviceName) ) +// CircuitState defines the circuit breaker states. +type CircuitState int + +const ( + StateClosed CircuitState = iota // Normal operation + StateOpen // Reject requests until cooldown expires + StateHalfOpen // Allow a single probe +) + +// circuitState holds circuit breaker metadata; it does NOT duplicate gRPC clients/conns. +type circuitState struct { + lastFailureTime time.Time + openTime time.Time + state CircuitState + consecutiveFailures int +} + type client struct { client clusterv1.ServiceClient conn *grpc.ClientConn @@ -131,6 +148,8 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { c := clusterv1.NewServiceClient(conn) p.active[name] = &client{conn: conn, client: c, md: md} p.addClient(md) + // Initialize or reset circuit breaker state to closed + p.recordSuccess(name) p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("new node is healthy, add it to active queue") } @@ -279,6 +298,8 @@ func (p *pub) checkClientHealthAndReconnect(conn *grpc.ClientConn, md schema.Met p.active[name] = &client{conn: connEvict, client: c, md: md} p.addClient(md) delete(p.evictable, name) + // Reset circuit breaker state to closed + p.recordSuccess(name) p.log.Info().Str("status", p.dump()).Stringer("node", en.n).Msg("node is healthy, move it back to active queue") }() return @@ -417,6 +438,8 @@ func (p *pub) checkWritable(n string, topic bus.Topic) (bool, *common.Error) { if !okCur { return } + // Record success for circuit breaker + p.recordSuccess(nodeName) h.OnAddOrUpdate(nodeCur.md) }() return @@ -471,3 +494,83 @@ type evictNode struct { n *databasev1.Node c chan struct{} } + +// isRequestAllowed checks if a request to the given node is allowed based on circuit breaker state. +func (p *pub) isRequestAllowed(node string) bool { + p.cbMu.RLock() + defer p.cbMu.RUnlock() + + cb, exists := p.cbStates[node] + if !exists { + return true // No circuit breaker state, allow request + } + + switch cb.state { + case StateClosed: + return true + case StateOpen: + // Check if cooldown period has expired + return time.Since(cb.openTime) >= 60*time.Second // TODO: make configurable + case StateHalfOpen: + // Allow only one probe request in half-open state + // This is a simple implementation - in production, we'd need atomic operations + return true + default: + return true + } +} + +// recordSuccess resets the circuit breaker state to Closed on successful operation. +func (p *pub) recordSuccess(node string) { + p.cbMu.Lock() + defer p.cbMu.Unlock() + + cb, exists := p.cbStates[node] + if !exists { + // Initialize circuit breaker state + p.cbStates[node] = &circuitState{ + state: StateClosed, + consecutiveFailures: 0, + } + return + } + + // Reset to closed state + cb.state = StateClosed + cb.consecutiveFailures = 0 + cb.lastFailureTime = time.Time{} + cb.openTime = time.Time{} +} + +// recordFailure updates the circuit breaker state on failed operation. +func (p *pub) recordFailure(node string) { + p.cbMu.Lock() + defer p.cbMu.Unlock() + + cb, exists := p.cbStates[node] + if !exists { + // Initialize circuit breaker state + cb = &circuitState{ + state: StateClosed, + consecutiveFailures: 1, + lastFailureTime: time.Now(), + } + p.cbStates[node] = cb + } else { + cb.consecutiveFailures++ + cb.lastFailureTime = time.Now() + } + + // Check if we should open the circuit + threshold := 5 // TODO: make configurable + if cb.consecutiveFailures >= threshold && cb.state == StateClosed { + cb.state = StateOpen + cb.openTime = time.Now() + p.log.Warn().Str("node", node).Int("failures", cb.consecutiveFailures).Msg("circuit breaker opened") + } else if cb.state == StateHalfOpen { + // Failed during half-open, go back to open + cb.state = StateOpen + cb.openTime = time.Now() + p.log.Warn().Str("node", node).Msg("circuit breaker reopened after half-open failure") + } +} diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index 987931db..d8eaa07b 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -64,17 +64,19 @@ var ( type pub struct { schema.UnimplementedOnInitHandler metadata metadata.Repo - handlers map[bus.Topic]schema.EventHandler - log *logger.Logger + writableProbe map[string]map[string]struct{} + cbStates map[string]*circuitState registered map[string]*databasev1.Node active map[string]*client evictable map[string]evictNode closer *run.Closer - writableProbe map[string]map[string]struct{} + handlers map[bus.Topic]schema.EventHandler + log *logger.Logger caCertPath string prefix string allowedRoles []databasev1.Role mu sync.RWMutex + cbMu sync.RWMutex writableProbeMu sync.Mutex tlsEnabled bool } @@ -290,6 +292,7 @@ func New(metadata metadata.Repo, roles ...databasev1.Role) queue.Client { allowedRoles: roles, prefix: strBuilder.String(), writableProbe: make(map[string]map[string]struct{}), + cbStates: make(map[string]*circuitState), } return p }