This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch pub/perf
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit d22ec2fc4081960f7c2f0f89ab39d6362d8bf43b
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Sep 4 04:36:49 2025 +0800

    Add circuit breaker implementation for client health management
    
    - Introduced a circuit breaker mechanism to manage client health checks, 
allowing for better handling of failures and recovery.
    - Added `CircuitState` type and associated logic to track the state of each 
client connection.
    - Implemented methods to record successes and failures, adjusting the 
circuit state accordingly.
    - Enhanced the `isRequestAllowed` function to determine request eligibility 
based on the circuit breaker state.
    - Updated the `pub` struct to include a map for circuit breaker states, 
improving resilience in client operations.
---
 banyand/queue/pub/client.go | 103 ++++++++++++++++++++++++++++++++++++++++++++
 banyand/queue/pub/pub.go    |   9 ++--
 2 files changed, 109 insertions(+), 3 deletions(-)

diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index a0a68a6e..de22b497 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -60,6 +60,23 @@ var (
        }]}`, serviceName)
 )
 
+// CircuitState defines the circuit breaker states.
+type CircuitState int
+
+const (
+       StateClosed   CircuitState = iota // Normal operation
+       StateOpen                         // Reject requests until cooldown 
expires
+       StateHalfOpen                     // Allow a single probe
+)
+
+// circuitState holds circuit breaker metadata; it does NOT duplicate gRPC 
clients/conns.
+type circuitState struct {
+       lastFailureTime     time.Time
+       openTime            time.Time
+       state               CircuitState
+       consecutiveFailures int
+}
+
 type client struct {
        client clusterv1.ServiceClient
        conn   *grpc.ClientConn
@@ -131,6 +148,8 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) {
        c := clusterv1.NewServiceClient(conn)
        p.active[name] = &client{conn: conn, client: c, md: md}
        p.addClient(md)
+       // Initialize or reset circuit breaker state to closed
+       p.recordSuccess(name)
        p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("new 
node is healthy, add it to active queue")
 }
 
@@ -279,6 +298,8 @@ func (p *pub) checkClientHealthAndReconnect(conn 
*grpc.ClientConn, md schema.Met
                                                p.active[name] = &client{conn: 
connEvict, client: c, md: md}
                                                p.addClient(md)
                                                delete(p.evictable, name)
+                                               // Reset circuit breaker state 
to closed
+                                               p.recordSuccess(name)
                                                p.log.Info().Str("status", 
p.dump()).Stringer("node", en.n).Msg("node is healthy, move it back to active 
queue")
                                        }()
                                        return
@@ -417,6 +438,8 @@ func (p *pub) checkWritable(n string, topic bus.Topic) 
(bool, *common.Error) {
                                                if !okCur {
                                                        return
                                                }
+                                               // Record success for circuit 
breaker
+                                               p.recordSuccess(nodeName)
                                                h.OnAddOrUpdate(nodeCur.md)
                                        }()
                                        return
@@ -471,3 +494,83 @@ type evictNode struct {
        n *databasev1.Node
        c chan struct{}
 }
+
+// isRequestAllowed checks if a request to the given node is allowed based on 
circuit breaker state.
+func (p *pub) isRequestAllowed(node string) bool {
+       p.cbMu.RLock()
+       defer p.cbMu.RUnlock()
+
+       cb, exists := p.cbStates[node]
+       if !exists {
+               return true // No circuit breaker state, allow request
+       }
+
+       switch cb.state {
+       case StateClosed:
+               return true
+       case StateOpen:
+               // Check if cooldown period has expired
+               return time.Since(cb.openTime) >= 60*time.Second // TODO: make 
configurable
+       case StateHalfOpen:
+               // Allow only one probe request in half-open state
+               // This is a simple implementation - in production, we'd need 
atomic operations
+               return true
+       default:
+               return true
+       }
+}
+
+// recordSuccess resets the circuit breaker state to Closed on successful 
operation.
+func (p *pub) recordSuccess(node string) {
+       p.cbMu.Lock()
+       defer p.cbMu.Unlock()
+
+       cb, exists := p.cbStates[node]
+       if !exists {
+               // Initialize circuit breaker state
+               p.cbStates[node] = &circuitState{
+                       state:               StateClosed,
+                       consecutiveFailures: 0,
+               }
+               return
+       }
+
+       // Reset to closed state
+       cb.state = StateClosed
+       cb.consecutiveFailures = 0
+       cb.lastFailureTime = time.Time{}
+       cb.openTime = time.Time{}
+}
+
+// recordFailure updates the circuit breaker state on failed operation.
+func (p *pub) recordFailure(node string) {
+       p.cbMu.Lock()
+       defer p.cbMu.Unlock()
+
+       cb, exists := p.cbStates[node]
+       if !exists {
+               // Initialize circuit breaker state
+               cb = &circuitState{
+                       state:               StateClosed,
+                       consecutiveFailures: 1,
+                       lastFailureTime:     time.Now(),
+               }
+               p.cbStates[node] = cb
+       } else {
+               cb.consecutiveFailures++
+               cb.lastFailureTime = time.Now()
+       }
+
+       // Check if we should open the circuit
+       threshold := 5 // TODO: make configurable
+       if cb.consecutiveFailures >= threshold && cb.state == StateClosed {
+               cb.state = StateOpen
+               cb.openTime = time.Now()
+               p.log.Warn().Str("node", node).Int("failures", 
cb.consecutiveFailures).Msg("circuit breaker opened")
+       } else if cb.state == StateHalfOpen {
+               // Failed during half-open, go back to open
+               cb.state = StateOpen
+               cb.openTime = time.Now()
+               p.log.Warn().Str("node", node).Msg("circuit breaker reopened 
after half-open failure")
+       }
+}
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 987931db..d8eaa07b 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -64,17 +64,19 @@ var (
 type pub struct {
        schema.UnimplementedOnInitHandler
        metadata        metadata.Repo
-       handlers        map[bus.Topic]schema.EventHandler
-       log             *logger.Logger
+       writableProbe   map[string]map[string]struct{}
+       cbStates        map[string]*circuitState
        registered      map[string]*databasev1.Node
        active          map[string]*client
        evictable       map[string]evictNode
        closer          *run.Closer
-       writableProbe   map[string]map[string]struct{}
+       handlers        map[bus.Topic]schema.EventHandler
+       log             *logger.Logger
        caCertPath      string
        prefix          string
        allowedRoles    []databasev1.Role
        mu              sync.RWMutex
+       cbMu            sync.RWMutex
        writableProbeMu sync.Mutex
        tlsEnabled      bool
 }
@@ -290,6 +292,7 @@ func New(metadata metadata.Repo, roles ...databasev1.Role) 
queue.Client {
                allowedRoles:  roles,
                prefix:        strBuilder.String(),
                writableProbe: make(map[string]map[string]struct{}),
+               cbStates:      make(map[string]*circuitState),
        }
        return p
 }

Reply via email to