hanahmily commented on code in PR #918:
URL: 
https://github.com/apache/skywalking-banyandb/pull/918#discussion_r2661303856


##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,361 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+       connManagerMaxRetries       = 3
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+)
+
+// HeartbeatChecker is a function that checks if the connection is healthy.
+type HeartbeatChecker func(context.Context) error
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       currentConn       *grpc.ClientConn
+       closer            *run.Closer
+       heartbeatChecker  HeartbeatChecker
+       eventCh           chan ConnEvent
+       proxyAddr         string
+       reconnectInterval time.Duration
+       retryInterval     time.Duration
+       state             ConnState
+       stateMu           sync.RWMutex
+       startOnce         sync.Once
+}
+
+// NewConnManager creates a new connection manager.
+func NewConnManager(
+       proxyAddr string,
+       reconnectInterval time.Duration,
+       logger *logger.Logger,
+) *ConnManager {
+       return &ConnManager{
+               eventCh:           make(chan ConnEvent, 10),
+               closer:            run.NewCloser(1),
+               logger:            logger,
+               proxyAddr:         proxyAddr,
+               reconnectInterval: reconnectInterval,
+               state:             ConnStateDisconnected,
+               retryInterval:     reconnectInterval,
+               heartbeatChecker:  nil,
+       }
+}
+
+// SetHeartbeatChecker sets the heartbeat checker function.
+func (cm *ConnManager) SetHeartbeatChecker(checker HeartbeatChecker) {
+       cm.stateMu.Lock()
+       cm.heartbeatChecker = checker
+       cm.stateMu.Unlock()
+}
+
+// EventChannel returns the channel for sending connect/reconnect events.
+func (cm *ConnManager) EventChannel() chan<- ConnEvent {
+       return cm.eventCh
+}
+
+// Start starts the connection manager's event processing goroutine.
+func (cm *ConnManager) Start(ctx context.Context) {
+       cm.startOnce.Do(func() {
+               go cm.run(ctx)
+       })
+}
+
+// Stop stops the connection manager and closes all connections.
+func (cm *ConnManager) Stop() {
+       cm.closer.CloseThenWait()
+}
+
+// requestConnection requests a connection attempt with optional heartbeat 
check.
+func (cm *ConnManager) requestConnection(ctx context.Context, immediate bool) 
<-chan ConnResult {
+       resultCh := make(chan ConnResult, 1)
+       event := ConnEvent{
+               Type:      ConnEventConnect,
+               Context:   ctx,
+               ResultCh:  resultCh,
+               Immediate: immediate,
+       }
+       select {
+       case cm.eventCh <- event:
+       case <-ctx.Done():
+               resultCh <- ConnResult{Error: ctx.Err()}
+               close(resultCh)
+       default:
+               resultCh <- ConnResult{Error: fmt.Errorf("connection manager 
event channel is full")}
+               close(resultCh)
+       }
+       return resultCh
+}
+
+// RequestConnect requests a connection attempt.
+func (cm *ConnManager) RequestConnect(ctx context.Context) <-chan ConnResult {
+       return cm.requestConnection(ctx, true)
+}
+
+// RequestReconnect requests a reconnection attempt with exponential backoff.
+func (cm *ConnManager) RequestReconnect(ctx context.Context) <-chan ConnResult 
{
+       return cm.requestConnection(ctx, false)
+}
+
+// run is the main event processing loop running in a single goroutine.
+func (cm *ConnManager) run(ctx context.Context) {
+       defer cm.closer.Done()
+       for {
+               select {
+               case <-ctx.Done():
+                       cm.cleanup()
+                       return
+               case <-cm.closer.CloseNotify():
+                       cm.cleanup()
+                       return
+               case event := <-cm.eventCh:
+                       cm.handleEvent(ctx, event)
+               }
+       }
+}
+
+// handleEvent runs in the single goroutine and processes a connection event.
+func (cm *ConnManager) handleEvent(ctx context.Context, event ConnEvent) {
+       switch event.Type {
+       case ConnEventDisconnect:
+               cm.stateMu.Lock()
+               cm.state = ConnStateDisconnected
+               cm.stateMu.Unlock()
+
+               cm.cleanupConnection()
+               if event.ResultCh != nil {
+                       event.ResultCh <- ConnResult{Error: nil}
+                       close(event.ResultCh)
+               }
+       case ConnEventConnect:
+               cm.stateMu.RLock()
+               currentState := cm.state
+               currentConn := cm.currentConn
+               heartbeatChecker := cm.heartbeatChecker
+               cm.stateMu.RUnlock()
+
+               if currentState == ConnStateConnected && currentConn != nil && 
heartbeatChecker != nil {
+                       checkCtx, checkCancel := context.WithTimeout(ctx, 
5*time.Second)
+                       heartbeatErr := heartbeatChecker(checkCtx)
+                       checkCancel()
+
+                       if heartbeatErr == nil {
+                               cm.logger.Debug().Msg("Connection is healthy, 
reusing existing connection")
+                               if event.ResultCh != nil {
+                                       event.ResultCh <- ConnResult{Conn: 
currentConn}
+                                       close(event.ResultCh)
+                               }
+                               return
+                       }
+                       cm.logger.Warn().Err(heartbeatErr).Msg("Heartbeat check 
failed, reconnecting")
+               }
+               // For reconnect cleanup old connection first
+               if currentState == ConnStateConnected {
+                       cm.cleanupConnection()
+               }
+
+               connCtx, connCancel := context.WithCancel(ctx)
+               defer connCancel()
+               if event.Context != nil {
+                       go func() {
+                               select {
+                               case <-event.Context.Done():
+                                       connCancel()
+                               case <-connCtx.Done():
+                               }
+                       }()
+               }
+
+               var result ConnResult
+               if !event.Immediate {
+                       result = cm.doReconnect(connCtx)
+               } else {
+                       result = cm.doConnect(connCtx)
+               }
+
+               cm.stateMu.Lock()
+               if result.Error == nil {
+                       cm.state = ConnStateConnected
+                       cm.currentConn = result.Conn
+                       cm.retryInterval = cm.reconnectInterval
+               } else {
+                       cm.state = ConnStateDisconnected
+               }
+               cm.stateMu.Unlock()
+
+               if event.ResultCh != nil {
+                       event.ResultCh <- result
+                       close(event.ResultCh)
+               }
+       }
+}
+
+// doConnect performs the actual connection attempt.
+func (cm *ConnManager) doConnect(ctx context.Context) ConnResult {
+       select {
+       case <-ctx.Done():
+               return ConnResult{Error: ctx.Err()}
+       default:
+       }
+
+       conn, dialErr := grpc.NewClient(cm.proxyAddr, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if dialErr != nil {
+               cm.logger.Error().Err(dialErr).Str("proxy_addr", 
cm.proxyAddr).Msg("Failed to create proxy client")
+               return ConnResult{
+                       Error: fmt.Errorf("failed to create proxy client: %w", 
dialErr),
+               }
+       }
+       cm.logger.Info().Str("proxy_addr", cm.proxyAddr).Msg("Connected to FODC 
Proxy")
+       return ConnResult{
+               Conn: conn,
+       }
+}
+
+// doReconnect performs reconnection with exponential backoff and automatic 
retries.
+func (cm *ConnManager) doReconnect(ctx context.Context) ConnResult {
+       var lastErr error
+       for attempt := 1; attempt <= connManagerMaxRetries; attempt++ {
+               cm.stateMu.RLock()
+               retryInterval := cm.retryInterval
+               cm.stateMu.RUnlock()

Review Comment:
   ```suggestion
        var lastErr error
        retryInterval := cm.retryInterval
        for attempt := 1; attempt <= connManagerMaxRetries; attempt++ {
                
   ```



##########
fodc/agent/internal/proxy/conn_manager.go:
##########


Review Comment:
   Convert all types in the file to unexported, since there are no references 
outside the package.



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,361 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+       connManagerMaxRetries       = 3
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+)
+
+// HeartbeatChecker is a function that checks if the connection is healthy.
+type HeartbeatChecker func(context.Context) error
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       currentConn       *grpc.ClientConn
+       closer            *run.Closer
+       heartbeatChecker  HeartbeatChecker
+       eventCh           chan ConnEvent
+       proxyAddr         string
+       reconnectInterval time.Duration
+       retryInterval     time.Duration
+       state             ConnState
+       stateMu           sync.RWMutex
+       startOnce         sync.Once
+}
+
+// NewConnManager creates a new connection manager.
+func NewConnManager(
+       proxyAddr string,
+       reconnectInterval time.Duration,
+       logger *logger.Logger,
+) *ConnManager {
+       return &ConnManager{
+               eventCh:           make(chan ConnEvent, 10),
+               closer:            run.NewCloser(1),
+               logger:            logger,
+               proxyAddr:         proxyAddr,
+               reconnectInterval: reconnectInterval,
+               state:             ConnStateDisconnected,
+               retryInterval:     reconnectInterval,
+               heartbeatChecker:  nil,
+       }
+}
+
+// SetHeartbeatChecker sets the heartbeat checker function.
+func (cm *ConnManager) SetHeartbeatChecker(checker HeartbeatChecker) {
+       cm.stateMu.Lock()
+       cm.heartbeatChecker = checker
+       cm.stateMu.Unlock()
+}
+
+// EventChannel returns the channel for sending connect/reconnect events.
+func (cm *ConnManager) EventChannel() chan<- ConnEvent {
+       return cm.eventCh
+}
+
+// Start starts the connection manager's event processing goroutine.
+func (cm *ConnManager) Start(ctx context.Context) {
+       cm.startOnce.Do(func() {
+               go cm.run(ctx)
+       })
+}
+
+// Stop stops the connection manager and closes all connections.
+func (cm *ConnManager) Stop() {
+       cm.closer.CloseThenWait()
+}
+
+// requestConnection requests a connection attempt with optional heartbeat 
check.
+func (cm *ConnManager) requestConnection(ctx context.Context, immediate bool) 
<-chan ConnResult {
+       resultCh := make(chan ConnResult, 1)
+       event := ConnEvent{
+               Type:      ConnEventConnect,
+               Context:   ctx,
+               ResultCh:  resultCh,
+               Immediate: immediate,
+       }
+       select {
+       case cm.eventCh <- event:
+       case <-ctx.Done():
+               resultCh <- ConnResult{Error: ctx.Err()}
+               close(resultCh)
+       default:
+               resultCh <- ConnResult{Error: fmt.Errorf("connection manager 
event channel is full")}
+               close(resultCh)
+       }
+       return resultCh
+}
+
+// RequestConnect requests a connection attempt.
+func (cm *ConnManager) RequestConnect(ctx context.Context) <-chan ConnResult {
+       return cm.requestConnection(ctx, true)
+}
+
+// RequestReconnect requests a reconnection attempt with exponential backoff.
+func (cm *ConnManager) RequestReconnect(ctx context.Context) <-chan ConnResult 
{
+       return cm.requestConnection(ctx, false)
+}
+
+// run is the main event processing loop running in a single goroutine.
+func (cm *ConnManager) run(ctx context.Context) {
+       defer cm.closer.Done()
+       for {
+               select {
+               case <-ctx.Done():
+                       cm.cleanup()
+                       return
+               case <-cm.closer.CloseNotify():
+                       cm.cleanup()
+                       return
+               case event := <-cm.eventCh:
+                       cm.handleEvent(ctx, event)
+               }
+       }
+}
+
+// handleEvent runs in the single goroutine and processes a connection event.
+func (cm *ConnManager) handleEvent(ctx context.Context, event ConnEvent) {
+       switch event.Type {
+       case ConnEventDisconnect:
+               cm.stateMu.Lock()
+               cm.state = ConnStateDisconnected
+               cm.stateMu.Unlock()
+
+               cm.cleanupConnection()
+               if event.ResultCh != nil {
+                       event.ResultCh <- ConnResult{Error: nil}
+                       close(event.ResultCh)
+               }
+       case ConnEventConnect:
+               cm.stateMu.RLock()
+               currentState := cm.state
+               currentConn := cm.currentConn
+               heartbeatChecker := cm.heartbeatChecker
+               cm.stateMu.RUnlock()
+
+               if currentState == ConnStateConnected && currentConn != nil && 
heartbeatChecker != nil {
+                       checkCtx, checkCancel := context.WithTimeout(ctx, 
5*time.Second)
+                       heartbeatErr := heartbeatChecker(checkCtx)
+                       checkCancel()
+
+                       if heartbeatErr == nil {
+                               cm.logger.Debug().Msg("Connection is healthy, 
reusing existing connection")
+                               if event.ResultCh != nil {
+                                       event.ResultCh <- ConnResult{Conn: 
currentConn}
+                                       close(event.ResultCh)
+                               }
+                               return
+                       }
+                       cm.logger.Warn().Err(heartbeatErr).Msg("Heartbeat check 
failed, reconnecting")
+               }
+               // For reconnect cleanup old connection first
+               if currentState == ConnStateConnected {
+                       cm.cleanupConnection()
+               }
+
+               connCtx, connCancel := context.WithCancel(ctx)
+               defer connCancel()
+               if event.Context != nil {
+                       go func() {
+                               select {
+                               case <-event.Context.Done():
+                                       connCancel()
+                               case <-connCtx.Done():
+                               }
+                       }()
+               }
+
+               var result ConnResult
+               if !event.Immediate {
+                       result = cm.doReconnect(connCtx)
+               } else {
+                       result = cm.doConnect(connCtx)
+               }
+
+               cm.stateMu.Lock()
+               if result.Error == nil {
+                       cm.state = ConnStateConnected
+                       cm.currentConn = result.Conn
+                       cm.retryInterval = cm.reconnectInterval
+               } else {
+                       cm.state = ConnStateDisconnected
+               }
+               cm.stateMu.Unlock()
+
+               if event.ResultCh != nil {
+                       event.ResultCh <- result
+                       close(event.ResultCh)
+               }
+       }
+}
+
+// doConnect performs the actual connection attempt.
+func (cm *ConnManager) doConnect(ctx context.Context) ConnResult {
+       select {
+       case <-ctx.Done():
+               return ConnResult{Error: ctx.Err()}
+       default:
+       }
+
+       conn, dialErr := grpc.NewClient(cm.proxyAddr, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if dialErr != nil {
+               cm.logger.Error().Err(dialErr).Str("proxy_addr", 
cm.proxyAddr).Msg("Failed to create proxy client")
+               return ConnResult{
+                       Error: fmt.Errorf("failed to create proxy client: %w", 
dialErr),
+               }
+       }
+       cm.logger.Info().Str("proxy_addr", cm.proxyAddr).Msg("Connected to FODC 
Proxy")
+       return ConnResult{
+               Conn: conn,
+       }
+}
+
+// doReconnect performs reconnection with exponential backoff and automatic 
retries.
+func (cm *ConnManager) doReconnect(ctx context.Context) ConnResult {
+       var lastErr error
+       for attempt := 1; attempt <= connManagerMaxRetries; attempt++ {
+               cm.stateMu.RLock()
+               retryInterval := cm.retryInterval
+               cm.stateMu.RUnlock()
+
+               // Wait before attempting connection
+               select {
+               case <-ctx.Done():
+                       return ConnResult{Error: ctx.Err()}
+               case <-cm.closer.CloseNotify():
+                       return ConnResult{Error: fmt.Errorf("connection manager 
stopped")}
+               case <-time.After(retryInterval):

Review Comment:
   Why wait a while before connecting?



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,361 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+       connManagerMaxRetries       = 3
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+)
+
+// HeartbeatChecker is a function that checks if the connection is healthy.
+type HeartbeatChecker func(context.Context) error
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       currentConn       *grpc.ClientConn
+       closer            *run.Closer
+       heartbeatChecker  HeartbeatChecker
+       eventCh           chan ConnEvent
+       proxyAddr         string
+       reconnectInterval time.Duration
+       retryInterval     time.Duration
+       state             ConnState
+       stateMu           sync.RWMutex
+       startOnce         sync.Once
+}
+
+// NewConnManager creates a new connection manager.
+func NewConnManager(
+       proxyAddr string,
+       reconnectInterval time.Duration,
+       logger *logger.Logger,
+) *ConnManager {
+       return &ConnManager{
+               eventCh:           make(chan ConnEvent, 10),
+               closer:            run.NewCloser(1),
+               logger:            logger,
+               proxyAddr:         proxyAddr,
+               reconnectInterval: reconnectInterval,
+               state:             ConnStateDisconnected,
+               retryInterval:     reconnectInterval,
+               heartbeatChecker:  nil,
+       }
+}
+
+// SetHeartbeatChecker sets the heartbeat checker function.
+func (cm *ConnManager) SetHeartbeatChecker(checker HeartbeatChecker) {
+       cm.stateMu.Lock()
+       cm.heartbeatChecker = checker
+       cm.stateMu.Unlock()
+}
+
+// EventChannel returns the channel for sending connect/reconnect events.
+func (cm *ConnManager) EventChannel() chan<- ConnEvent {
+       return cm.eventCh
+}
+
+// Start starts the connection manager's event processing goroutine.
+func (cm *ConnManager) Start(ctx context.Context) {
+       cm.startOnce.Do(func() {
+               go cm.run(ctx)
+       })
+}
+
+// Stop stops the connection manager and closes all connections.
+func (cm *ConnManager) Stop() {
+       cm.closer.CloseThenWait()
+}
+
+// requestConnection requests a connection attempt with optional heartbeat 
check.
+func (cm *ConnManager) requestConnection(ctx context.Context, immediate bool) 
<-chan ConnResult {
+       resultCh := make(chan ConnResult, 1)
+       event := ConnEvent{
+               Type:      ConnEventConnect,
+               Context:   ctx,
+               ResultCh:  resultCh,
+               Immediate: immediate,
+       }
+       select {
+       case cm.eventCh <- event:
+       case <-ctx.Done():
+               resultCh <- ConnResult{Error: ctx.Err()}
+               close(resultCh)
+       default:
+               resultCh <- ConnResult{Error: fmt.Errorf("connection manager 
event channel is full")}
+               close(resultCh)
+       }
+       return resultCh
+}
+
+// RequestConnect requests a connection attempt.
+func (cm *ConnManager) RequestConnect(ctx context.Context) <-chan ConnResult {
+       return cm.requestConnection(ctx, true)
+}
+
+// RequestReconnect requests a reconnection attempt with exponential backoff.
+func (cm *ConnManager) RequestReconnect(ctx context.Context) <-chan ConnResult 
{
+       return cm.requestConnection(ctx, false)
+}
+
+// run is the main event processing loop running in a single goroutine.
+func (cm *ConnManager) run(ctx context.Context) {
+       defer cm.closer.Done()
+       for {
+               select {
+               case <-ctx.Done():
+                       cm.cleanup()
+                       return
+               case <-cm.closer.CloseNotify():
+                       cm.cleanup()
+                       return
+               case event := <-cm.eventCh:
+                       cm.handleEvent(ctx, event)
+               }
+       }
+}
+
+// handleEvent runs in the single goroutine and processes a connection event.
+func (cm *ConnManager) handleEvent(ctx context.Context, event ConnEvent) {
+       switch event.Type {
+       case ConnEventDisconnect:
+               cm.stateMu.Lock()
+               cm.state = ConnStateDisconnected
+               cm.stateMu.Unlock()
+
+               cm.cleanupConnection()
+               if event.ResultCh != nil {
+                       event.ResultCh <- ConnResult{Error: nil}
+                       close(event.ResultCh)
+               }
+       case ConnEventConnect:
+               cm.stateMu.RLock()
+               currentState := cm.state
+               currentConn := cm.currentConn
+               heartbeatChecker := cm.heartbeatChecker
+               cm.stateMu.RUnlock()
+
+               if currentState == ConnStateConnected && currentConn != nil && 
heartbeatChecker != nil {
+                       checkCtx, checkCancel := context.WithTimeout(ctx, 
5*time.Second)
+                       heartbeatErr := heartbeatChecker(checkCtx)
+                       checkCancel()
+
+                       if heartbeatErr == nil {
+                               cm.logger.Debug().Msg("Connection is healthy, 
reusing existing connection")
+                               if event.ResultCh != nil {
+                                       event.ResultCh <- ConnResult{Conn: 
currentConn}
+                                       close(event.ResultCh)
+                               }
+                               return
+                       }
+                       cm.logger.Warn().Err(heartbeatErr).Msg("Heartbeat check 
failed, reconnecting")
+               }
+               // For reconnect cleanup old connection first
+               if currentState == ConnStateConnected {
+                       cm.cleanupConnection()
+               }
+
+               connCtx, connCancel := context.WithCancel(ctx)
+               defer connCancel()
+               if event.Context != nil {
+                       go func() {
+                               select {
+                               case <-event.Context.Done():
+                                       connCancel()
+                               case <-connCtx.Done():
+                               }
+                       }()
+               }
+
+               var result ConnResult
+               if !event.Immediate {
+                       result = cm.doReconnect(connCtx)
+               } else {
+                       result = cm.doConnect(connCtx)
+               }
+
+               cm.stateMu.Lock()
+               if result.Error == nil {
+                       cm.state = ConnStateConnected
+                       cm.currentConn = result.Conn
+                       cm.retryInterval = cm.reconnectInterval
+               } else {
+                       cm.state = ConnStateDisconnected
+               }
+               cm.stateMu.Unlock()
+
+               if event.ResultCh != nil {
+                       event.ResultCh <- result
+                       close(event.ResultCh)
+               }
+       }
+}
+
+// doConnect performs the actual connection attempt.
+func (cm *ConnManager) doConnect(ctx context.Context) ConnResult {
+       select {
+       case <-ctx.Done():
+               return ConnResult{Error: ctx.Err()}
+       default:
+       }
+
+       conn, dialErr := grpc.NewClient(cm.proxyAddr, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if dialErr != nil {
+               cm.logger.Error().Err(dialErr).Str("proxy_addr", 
cm.proxyAddr).Msg("Failed to create proxy client")
+               return ConnResult{
+                       Error: fmt.Errorf("failed to create proxy client: %w", 
dialErr),
+               }
+       }
+       cm.logger.Info().Str("proxy_addr", cm.proxyAddr).Msg("Connected to FODC 
Proxy")
+       return ConnResult{
+               Conn: conn,
+       }
+}
+
+// doReconnect performs reconnection with exponential backoff and automatic 
retries.
+func (cm *ConnManager) doReconnect(ctx context.Context) ConnResult {
+       var lastErr error
+       for attempt := 1; attempt <= connManagerMaxRetries; attempt++ {
+               cm.stateMu.RLock()
+               retryInterval := cm.retryInterval
+               cm.stateMu.RUnlock()
+
+               // Wait before attempting connection
+               select {
+               case <-ctx.Done():
+                       return ConnResult{Error: ctx.Err()}
+               case <-cm.closer.CloseNotify():
+                       return ConnResult{Error: fmt.Errorf("connection manager 
stopped")}
+               case <-time.After(retryInterval):
+               }
+
+               cm.logger.Info().
+                       Dur("retry_interval", retryInterval).
+                       Int("attempt", attempt).
+                       Int("max_retries", connManagerMaxRetries).
+                       Msg("Attempting to reconnect...")
+
+               result := cm.doConnect(ctx)
+
+               if result.Error == nil {
+                       cm.logger.Info().Int("attempt", 
attempt).Msg("Reconnection succeeded")
+                       return result
+               }
+
+               lastErr = result.Error
+               if attempt < connManagerMaxRetries {
+                       cm.logger.Warn().
+                               Err(result.Error).
+                               Int("attempt", attempt).
+                               Int("remaining", connManagerMaxRetries-attempt).
+                               Msg("Reconnection attempt failed, will retry")
+
+                       cm.stateMu.Lock()
+                       cm.retryInterval *= 2
+                       if cm.retryInterval > connManagerMaxRetryInterval {
+                               cm.retryInterval = connManagerMaxRetryInterval
+                       }
+                       cm.stateMu.Unlock()
+               } else {
+                       cm.logger.Error().
+                               Err(result.Error).
+                               Int("attempt", attempt).
+                               Msg("Reconnection failed after max retries")
+               }
+       }

Review Comment:
   ```suggestion
        }
        cm.logger.Error().
                                Err(result.Error).
                                Int("attempt", attempt).
                                Msg("Reconnection failed after max retries")
   ```



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,361 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+       connManagerMaxRetries       = 3
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+)
+
+// HeartbeatChecker is a function that checks if the connection is healthy.
+type HeartbeatChecker func(context.Context) error
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       currentConn       *grpc.ClientConn
+       closer            *run.Closer
+       heartbeatChecker  HeartbeatChecker
+       eventCh           chan ConnEvent
+       proxyAddr         string
+       reconnectInterval time.Duration
+       retryInterval     time.Duration
+       state             ConnState
+       stateMu           sync.RWMutex
+       startOnce         sync.Once
+}
+
+// NewConnManager creates a new connection manager.
+func NewConnManager(
+       proxyAddr string,
+       reconnectInterval time.Duration,
+       logger *logger.Logger,
+) *ConnManager {
+       return &ConnManager{
+               eventCh:           make(chan ConnEvent, 10),
+               closer:            run.NewCloser(1),
+               logger:            logger,
+               proxyAddr:         proxyAddr,
+               reconnectInterval: reconnectInterval,
+               state:             ConnStateDisconnected,
+               retryInterval:     reconnectInterval,
+               heartbeatChecker:  nil,
+       }
+}
+
+// SetHeartbeatChecker sets the heartbeat checker function.
+func (cm *ConnManager) SetHeartbeatChecker(checker HeartbeatChecker) {
+       cm.stateMu.Lock()
+       cm.heartbeatChecker = checker
+       cm.stateMu.Unlock()
+}
+
+// EventChannel returns the channel for sending connect/reconnect events.
+func (cm *ConnManager) EventChannel() chan<- ConnEvent {
+       return cm.eventCh
+}
+
+// Start starts the connection manager's event processing goroutine.
+func (cm *ConnManager) Start(ctx context.Context) {
+       cm.startOnce.Do(func() {
+               go cm.run(ctx)
+       })
+}
+
+// Stop stops the connection manager and closes all connections.
+func (cm *ConnManager) Stop() {
+       cm.closer.CloseThenWait()
+}
+
+// requestConnection requests a connection attempt with optional heartbeat 
check.
+func (cm *ConnManager) requestConnection(ctx context.Context, immediate bool) 
<-chan ConnResult {
+       resultCh := make(chan ConnResult, 1)
+       event := ConnEvent{
+               Type:      ConnEventConnect,
+               Context:   ctx,
+               ResultCh:  resultCh,
+               Immediate: immediate,
+       }
+       select {
+       case cm.eventCh <- event:
+       case <-ctx.Done():
+               resultCh <- ConnResult{Error: ctx.Err()}
+               close(resultCh)
+       default:
+               resultCh <- ConnResult{Error: fmt.Errorf("connection manager 
event channel is full")}
+               close(resultCh)
+       }
+       return resultCh
+}
+
+// RequestConnect requests a connection attempt.
+func (cm *ConnManager) RequestConnect(ctx context.Context) <-chan ConnResult {
+       return cm.requestConnection(ctx, true)
+}
+
+// RequestReconnect requests a reconnection attempt with exponential backoff.
+func (cm *ConnManager) RequestReconnect(ctx context.Context) <-chan ConnResult 
{
+       return cm.requestConnection(ctx, false)
+}
+
+// run is the main event processing loop running in a single goroutine.
+func (cm *ConnManager) run(ctx context.Context) {
+       defer cm.closer.Done()
+       for {
+               select {
+               case <-ctx.Done():
+                       cm.cleanup()
+                       return
+               case <-cm.closer.CloseNotify():
+                       cm.cleanup()
+                       return
+               case event := <-cm.eventCh:
+                       cm.handleEvent(ctx, event)
+               }
+       }
+}
+
+// handleEvent runs in the single goroutine and processes a connection event.
+func (cm *ConnManager) handleEvent(ctx context.Context, event ConnEvent) {
+       switch event.Type {
+       case ConnEventDisconnect:
+               cm.stateMu.Lock()
+               cm.state = ConnStateDisconnected
+               cm.stateMu.Unlock()
+
+               cm.cleanupConnection()
+               if event.ResultCh != nil {
+                       event.ResultCh <- ConnResult{Error: nil}
+                       close(event.ResultCh)
+               }
+       case ConnEventConnect:
+               cm.stateMu.RLock()
+               currentState := cm.state
+               currentConn := cm.currentConn
+               heartbeatChecker := cm.heartbeatChecker
+               cm.stateMu.RUnlock()
+
+               if currentState == ConnStateConnected && currentConn != nil && 
heartbeatChecker != nil {
+                       checkCtx, checkCancel := context.WithTimeout(ctx, 
5*time.Second)
+                       heartbeatErr := heartbeatChecker(checkCtx)
+                       checkCancel()
+
+                       if heartbeatErr == nil {
+                               cm.logger.Debug().Msg("Connection is healthy, 
reusing existing connection")
+                               if event.ResultCh != nil {
+                                       event.ResultCh <- ConnResult{Conn: 
currentConn}
+                                       close(event.ResultCh)
+                               }
+                               return
+                       }
+                       cm.logger.Warn().Err(heartbeatErr).Msg("Heartbeat check 
failed, reconnecting")
+               }
+               // For reconnect cleanup old connection first
+               if currentState == ConnStateConnected {
+                       cm.cleanupConnection()
+               }
+
+               connCtx, connCancel := context.WithCancel(ctx)
+               defer connCancel()
+               if event.Context != nil {
+                       go func() {
+                               select {
+                               case <-event.Context.Done():
+                                       connCancel()
+                               case <-connCtx.Done():
+                               }
+                       }()
+               }
+
+               var result ConnResult
+               if !event.Immediate {
+                       result = cm.doReconnect(connCtx)
+               } else {
+                       result = cm.doConnect(connCtx)
+               }
+
+               cm.stateMu.Lock()
+               if result.Error == nil {
+                       cm.state = ConnStateConnected
+                       cm.currentConn = result.Conn
+                       cm.retryInterval = cm.reconnectInterval
+               } else {
+                       cm.state = ConnStateDisconnected
+               }
+               cm.stateMu.Unlock()
+
+               if event.ResultCh != nil {
+                       event.ResultCh <- result
+                       close(event.ResultCh)
+               }
+       }
+}
+
+// doConnect performs the actual connection attempt.
+func (cm *ConnManager) doConnect(ctx context.Context) ConnResult {
+       select {
+       case <-ctx.Done():
+               return ConnResult{Error: ctx.Err()}
+       default:
+       }
+
+       conn, dialErr := grpc.NewClient(cm.proxyAddr, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if dialErr != nil {
+               cm.logger.Error().Err(dialErr).Str("proxy_addr", 
cm.proxyAddr).Msg("Failed to create proxy client")
+               return ConnResult{
+                       Error: fmt.Errorf("failed to create proxy client: %w", 
dialErr),
+               }
+       }
+       cm.logger.Info().Str("proxy_addr", cm.proxyAddr).Msg("Connected to FODC 
Proxy")
+       return ConnResult{
+               Conn: conn,
+       }
+}
+
+// doReconnect performs reconnection with exponential backoff and automatic 
retries.
+func (cm *ConnManager) doReconnect(ctx context.Context) ConnResult {
+       var lastErr error
+       for attempt := 1; attempt <= connManagerMaxRetries; attempt++ {
+               cm.stateMu.RLock()
+               retryInterval := cm.retryInterval
+               cm.stateMu.RUnlock()
+
+               // Wait before attempting connection
+               select {
+               case <-ctx.Done():
+                       return ConnResult{Error: ctx.Err()}
+               case <-cm.closer.CloseNotify():
+                       return ConnResult{Error: fmt.Errorf("connection manager 
stopped")}
+               case <-time.After(retryInterval):
+               }
+
+               cm.logger.Info().
+                       Dur("retry_interval", retryInterval).
+                       Int("attempt", attempt).
+                       Int("max_retries", connManagerMaxRetries).
+                       Msg("Attempting to reconnect...")
+
+               result := cm.doConnect(ctx)
+
+               if result.Error == nil {
+                       cm.logger.Info().Int("attempt", 
attempt).Msg("Reconnection succeeded")
+                       return result
+               }
+
+               lastErr = result.Error
+               if attempt < connManagerMaxRetries {

Review Comment:
   ```suggestion
   ```



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,361 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+       connManagerMaxRetries       = 3
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+)
+
+// HeartbeatChecker is a function that checks if the connection is healthy.
+type HeartbeatChecker func(context.Context) error
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       currentConn       *grpc.ClientConn
+       closer            *run.Closer
+       heartbeatChecker  HeartbeatChecker
+       eventCh           chan ConnEvent
+       proxyAddr         string
+       reconnectInterval time.Duration
+       retryInterval     time.Duration
+       state             ConnState
+       stateMu           sync.RWMutex
+       startOnce         sync.Once
+}
+
+// NewConnManager creates a new connection manager.
+func NewConnManager(
+       proxyAddr string,
+       reconnectInterval time.Duration,
+       logger *logger.Logger,
+) *ConnManager {
+       return &ConnManager{
+               eventCh:           make(chan ConnEvent, 10),
+               closer:            run.NewCloser(1),
+               logger:            logger,
+               proxyAddr:         proxyAddr,
+               reconnectInterval: reconnectInterval,
+               state:             ConnStateDisconnected,
+               retryInterval:     reconnectInterval,
+               heartbeatChecker:  nil,
+       }
+}
+
+// SetHeartbeatChecker sets the heartbeat checker function.
+func (cm *ConnManager) SetHeartbeatChecker(checker HeartbeatChecker) {
+       cm.stateMu.Lock()
+       cm.heartbeatChecker = checker
+       cm.stateMu.Unlock()
+}
+
+// EventChannel returns the channel for sending connect/reconnect events.
+func (cm *ConnManager) EventChannel() chan<- ConnEvent {
+       return cm.eventCh
+}
+
+// Start starts the connection manager's event processing goroutine.
+func (cm *ConnManager) Start(ctx context.Context) {
+       cm.startOnce.Do(func() {
+               go cm.run(ctx)
+       })
+}
+
+// Stop stops the connection manager and closes all connections.
+func (cm *ConnManager) Stop() {
+       cm.closer.CloseThenWait()
+}
+
+// requestConnection requests a connection attempt with optional heartbeat 
check.
+func (cm *ConnManager) requestConnection(ctx context.Context, immediate bool) 
<-chan ConnResult {
+       resultCh := make(chan ConnResult, 1)
+       event := ConnEvent{
+               Type:      ConnEventConnect,
+               Context:   ctx,
+               ResultCh:  resultCh,
+               Immediate: immediate,
+       }
+       select {
+       case cm.eventCh <- event:
+       case <-ctx.Done():
+               resultCh <- ConnResult{Error: ctx.Err()}
+               close(resultCh)
+       default:
+               resultCh <- ConnResult{Error: fmt.Errorf("connection manager 
event channel is full")}
+               close(resultCh)
+       }
+       return resultCh
+}
+
+// RequestConnect requests a connection attempt.
+func (cm *ConnManager) RequestConnect(ctx context.Context) <-chan ConnResult {
+       return cm.requestConnection(ctx, true)
+}
+
+// RequestReconnect requests a reconnection attempt with exponential backoff.
+func (cm *ConnManager) RequestReconnect(ctx context.Context) <-chan ConnResult 
{
+       return cm.requestConnection(ctx, false)
+}
+
+// run is the main event processing loop running in a single goroutine.
+func (cm *ConnManager) run(ctx context.Context) {
+       defer cm.closer.Done()
+       for {
+               select {
+               case <-ctx.Done():
+                       cm.cleanup()
+                       return
+               case <-cm.closer.CloseNotify():
+                       cm.cleanup()
+                       return
+               case event := <-cm.eventCh:
+                       cm.handleEvent(ctx, event)
+               }
+       }
+}
+
+// handleEvent runs in the single goroutine and processes a connection event.
+func (cm *ConnManager) handleEvent(ctx context.Context, event ConnEvent) {
+       switch event.Type {
+       case ConnEventDisconnect:
+               cm.stateMu.Lock()
+               cm.state = ConnStateDisconnected
+               cm.stateMu.Unlock()
+
+               cm.cleanupConnection()
+               if event.ResultCh != nil {
+                       event.ResultCh <- ConnResult{Error: nil}
+                       close(event.ResultCh)
+               }
+       case ConnEventConnect:
+               cm.stateMu.RLock()
+               currentState := cm.state
+               currentConn := cm.currentConn
+               heartbeatChecker := cm.heartbeatChecker
+               cm.stateMu.RUnlock()
+
+               if currentState == ConnStateConnected && currentConn != nil && 
heartbeatChecker != nil {
+                       checkCtx, checkCancel := context.WithTimeout(ctx, 
5*time.Second)
+                       heartbeatErr := heartbeatChecker(checkCtx)
+                       checkCancel()
+
+                       if heartbeatErr == nil {
+                               cm.logger.Debug().Msg("Connection is healthy, 
reusing existing connection")
+                               if event.ResultCh != nil {
+                                       event.ResultCh <- ConnResult{Conn: 
currentConn}
+                                       close(event.ResultCh)
+                               }
+                               return
+                       }
+                       cm.logger.Warn().Err(heartbeatErr).Msg("Heartbeat check 
failed, reconnecting")
+               }
+               // For reconnect cleanup old connection first
+               if currentState == ConnStateConnected {
+                       cm.cleanupConnection()
+               }
+
+               connCtx, connCancel := context.WithCancel(ctx)
+               defer connCancel()
+               if event.Context != nil {
+                       go func() {
+                               select {
+                               case <-event.Context.Done():
+                                       connCancel()
+                               case <-connCtx.Done():
+                               }
+                       }()
+               }
+
+               var result ConnResult
+               if !event.Immediate {
+                       result = cm.doReconnect(connCtx)
+               } else {
+                       result = cm.doConnect(connCtx)
+               }
+
+               cm.stateMu.Lock()
+               if result.Error == nil {
+                       cm.state = ConnStateConnected
+                       cm.currentConn = result.Conn
+                       cm.retryInterval = cm.reconnectInterval
+               } else {
+                       cm.state = ConnStateDisconnected
+               }
+               cm.stateMu.Unlock()
+
+               if event.ResultCh != nil {
+                       event.ResultCh <- result
+                       close(event.ResultCh)
+               }
+       }
+}
+
+// doConnect performs the actual connection attempt.
+func (cm *ConnManager) doConnect(ctx context.Context) ConnResult {
+       select {
+       case <-ctx.Done():
+               return ConnResult{Error: ctx.Err()}
+       default:
+       }
+
+       conn, dialErr := grpc.NewClient(cm.proxyAddr, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if dialErr != nil {
+               cm.logger.Error().Err(dialErr).Str("proxy_addr", 
cm.proxyAddr).Msg("Failed to create proxy client")
+               return ConnResult{
+                       Error: fmt.Errorf("failed to create proxy client: %w", 
dialErr),
+               }
+       }
+       cm.logger.Info().Str("proxy_addr", cm.proxyAddr).Msg("Connected to FODC 
Proxy")
+       return ConnResult{
+               Conn: conn,
+       }
+}
+
+// doReconnect performs reconnection with exponential backoff and automatic 
retries.
+func (cm *ConnManager) doReconnect(ctx context.Context) ConnResult {
+       var lastErr error
+       for attempt := 1; attempt <= connManagerMaxRetries; attempt++ {
+               cm.stateMu.RLock()
+               retryInterval := cm.retryInterval
+               cm.stateMu.RUnlock()
+
+               // Wait before attempting connection
+               select {
+               case <-ctx.Done():
+                       return ConnResult{Error: ctx.Err()}
+               case <-cm.closer.CloseNotify():
+                       return ConnResult{Error: fmt.Errorf("connection manager 
stopped")}
+               case <-time.After(retryInterval):
+               }
+
+               cm.logger.Info().
+                       Dur("retry_interval", retryInterval).
+                       Int("attempt", attempt).
+                       Int("max_retries", connManagerMaxRetries).
+                       Msg("Attempting to reconnect...")
+
+               result := cm.doConnect(ctx)
+
+               if result.Error == nil {
+                       cm.logger.Info().Int("attempt", 
attempt).Msg("Reconnection succeeded")
+                       return result
+               }
+
+               lastErr = result.Error
+               if attempt < connManagerMaxRetries {
+                       cm.logger.Warn().
+                               Err(result.Error).
+                               Int("attempt", attempt).
+                               Int("remaining", connManagerMaxRetries-attempt).
+                               Msg("Reconnection attempt failed, will retry")
+
+                       cm.stateMu.Lock()
+                       cm.retryInterval *= 2
+                       if cm.retryInterval > connManagerMaxRetryInterval {
+                               cm.retryInterval = connManagerMaxRetryInterval
+                       }
+                       cm.stateMu.Unlock()

Review Comment:
   ```suggestion
                     retryInterval *= 2
   ```



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,428 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventReconnect
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnecting indicates a connection attempt is in progress.
+       ConnStateConnecting
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+       // ConnStateReconnecting indicates a reconnection attempt is in 
progress.
+       ConnStateReconnecting
+)
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       eventCh           chan ConnEvent
+       stopCh            chan struct{}
+       doneCh            chan struct{} // Signals when run() goroutine has 
exited
+       currentConn       *grpc.ClientConn
+       proxyAddr         string
+       reconnectInterval time.Duration

Review Comment:
   You can use retryInterval to implement the repty logic without 
reconnectInterval



##########
fodc/agent/internal/proxy/client.go:
##########
@@ -0,0 +1,748 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides a client for communicating with the FODC Proxy.
+package proxy
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "strings"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       flightrecorder 
"github.com/apache/skywalking-banyandb/fodc/agent/internal/flightrecorder"
+       "github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// MetricsRequestFilter defines filters for metrics requests.
+type MetricsRequestFilter struct {
+       StartTime *time.Time
+       EndTime   *time.Time
+}
+
+// Client manages connection and communication with the FODC Proxy.
+type Client struct {
+       connManager        *ConnManager
+       heartbeatTicker    *time.Ticker
+       flightRecorder     *flightrecorder.FlightRecorder
+       logger             *logger.Logger
+       stopCh             chan struct{}
+       labels             map[string]string
+       client             fodcv1.FODCServiceClient
+       registrationStream fodcv1.FODCService_RegisterAgentClient
+       metricsStream      fodcv1.FODCService_StreamMetricsClient
+
+       proxyAddr string
+       nodeIP    string
+       nodeRole  string
+       agentID   string
+
+       nodePort          int
+       heartbeatInterval time.Duration
+       reconnectInterval time.Duration
+       disconnected      bool
+       streamsMu         sync.RWMutex   // Protects streams only
+       heartbeatWg       sync.WaitGroup // Tracks heartbeat goroutine
+}
+
+// NewClient creates a new Client instance.
+func NewClient(
+       proxyAddr string,
+       nodeIP string,
+       nodePort int,
+       nodeRole string,
+       labels map[string]string,
+       heartbeatInterval time.Duration,
+       reconnectInterval time.Duration,
+       flightRecorder *flightrecorder.FlightRecorder,
+       logger *logger.Logger,
+) *Client {
+       connMgr := NewConnManager(proxyAddr, reconnectInterval, logger)
+       client := &Client{
+               connManager:       connMgr,
+               proxyAddr:         proxyAddr,
+               nodeIP:            nodeIP,
+               nodePort:          nodePort,
+               nodeRole:          nodeRole,
+               labels:            labels,
+               heartbeatInterval: heartbeatInterval,
+               reconnectInterval: reconnectInterval,
+               flightRecorder:    flightRecorder,
+               logger:            logger,
+               stopCh:            make(chan struct{}),
+       }
+
+       connMgr.SetHeartbeatChecker(client.SendHeartbeat)
+       return client
+}
+
+// StartConnManager is useful for tests or scenarios where you want to 
manually control connection lifecycle.
+func (c *Client) StartConnManager(ctx context.Context) {
+       c.connManager.Start(ctx)
+}
+
+// Connect establishes a gRPC connection to Proxy.
+func (c *Client) Connect(ctx context.Context) error {
+       resultCh := c.connManager.RequestConnect(ctx)
+       result := <-resultCh
+       if result.Error != nil {
+               return result.Error
+       }
+
+       c.streamsMu.Lock()
+       c.client = fodcv1.NewFODCServiceClient(result.Conn)
+       // Reset disconnected state and recreate stopCh for reconnection
+       if c.disconnected {
+               c.disconnected = false
+               c.stopCh = make(chan struct{})
+       }
+       c.streamsMu.Unlock()
+
+       return nil
+}
+
+// StartRegistrationStream establishes bi-directional registration stream with 
Proxy.
+func (c *Client) StartRegistrationStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       c.streamsMu.Unlock()
+
+       stream, streamErr := client.RegisterAgent(ctx)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create registration stream: %w", 
streamErr)
+       }
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send registration request: %w", 
sendErr)
+       }
+
+       resp, recvErr := stream.Recv()
+       if recvErr != nil {
+               return fmt.Errorf("failed to receive registration response: 
%w", recvErr)
+       }
+
+       if !resp.Success {
+               return fmt.Errorf("registration failed: %s", resp.Message)
+       }
+
+       if resp.AgentId == "" {
+               return fmt.Errorf("registration response missing agent ID")
+       }
+
+       c.streamsMu.Lock()
+       c.registrationStream = stream
+       c.agentID = resp.AgentId
+       if resp.HeartbeatIntervalSeconds > 0 {
+               c.heartbeatInterval = 
time.Duration(resp.HeartbeatIntervalSeconds) * time.Second
+       }
+       c.streamsMu.Unlock()
+
+       c.logger.Info().
+               Str("proxy_addr", c.proxyAddr).
+               Str("agent_id", resp.AgentId).
+               Dur("heartbeat_interval", c.heartbeatInterval).
+               Msg("Agent registered with Proxy")
+
+       c.startHeartbeat(ctx)
+
+       go c.handleRegistrationStream(ctx, stream)
+
+       return nil
+}
+
+// StartMetricsStream establishes bi-directional metrics stream with Proxy.
+func (c *Client) StartMetricsStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       agentID := c.agentID
+       c.streamsMu.Unlock()
+
+       if agentID == "" {
+               return fmt.Errorf("agent ID not available, register agent 
first")
+       }
+
+       md := metadata.New(map[string]string{"agent_id": agentID})
+       ctxWithMetadata := metadata.NewOutgoingContext(ctx, md)
+
+       stream, streamErr := client.StreamMetrics(ctxWithMetadata)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create metrics stream: %w", 
streamErr)
+       }
+
+       c.streamsMu.Lock()
+       c.metricsStream = stream
+       c.streamsMu.Unlock()
+
+       go c.handleMetricsStream(ctx, stream)
+
+       c.logger.Info().
+               Str("agent_id", agentID).
+               Msg("Metrics stream established with Proxy")
+
+       return nil
+}
+
+// RetrieveAndSendMetrics retrieves metrics from Flight Recorder when 
requested by Proxy.
+func (c *Client) RetrieveAndSendMetrics(_ context.Context, filter 
*MetricsRequestFilter) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.metricsStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("metrics stream not established")
+       }
+       metricsStream := c.metricsStream
+       c.streamsMu.RUnlock()
+
+       datasources := c.flightRecorder.GetDatasources()
+       if len(datasources) == 0 {
+               req := &fodcv1.StreamMetricsRequest{
+                       Metrics:   []*fodcv1.Metric{},
+                       Timestamp: timestamppb.Now(),
+               }
+               if sendErr := metricsStream.Send(req); sendErr != nil {
+                       return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+               }
+               return nil
+       }
+
+       ds := datasources[0]
+       allMetrics := ds.GetMetrics()
+       timestamps := ds.GetTimestamps()
+       descriptions := ds.GetDescriptions()
+
+       if filter != nil && (filter.StartTime != nil || filter.EndTime != nil) {
+               if timestamps == nil {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               timestampValues := timestamps.GetAllValues()
+               if len(timestampValues) == 0 {
+                       req := &fodcv1.StreamMetricsRequest{
+                               Metrics:   []*fodcv1.Metric{},
+                               Timestamp: timestamppb.Now(),
+                       }
+                       if sendErr := metricsStream.Send(req); sendErr != nil {
+                               return fmt.Errorf("failed to send empty metrics 
response: %w", sendErr)
+                       }
+                       return nil
+               }
+
+               return c.sendFilteredMetrics(metricsStream, allMetrics, 
timestampValues, descriptions, filter)
+       }
+
+       return c.sendLatestMetrics(metricsStream, allMetrics, descriptions)
+}
+
+// sendLatestMetrics sends the latest metrics (most recent values).
+func (c *Client) sendLatestMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       descriptions map[string]string,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValue := metricBuffer.GetCurrentValue()
+               allValues := metricBuffer.GetAllValues()
+
+               if len(allValues) == 0 && metricValue == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               protoMetric := &fodcv1.Metric{
+                       Name:        parsedKey.Name,
+                       Labels:      labelsMap,
+                       Value:       metricValue,
+                       Description: descriptions[parsedKey.Name],
+               }
+
+               protoMetrics = append(protoMetrics, protoMetric)
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// sendFilteredMetrics sends metrics filtered by time window.
+func (c *Client) sendFilteredMetrics(
+       stream fodcv1.FODCService_StreamMetricsClient,
+       allMetrics map[string]*flightrecorder.MetricRingBuffer,
+       timestampValues []int64,
+       descriptions map[string]string,
+       filter *MetricsRequestFilter,
+) error {
+       protoMetrics := make([]*fodcv1.Metric, 0)
+
+       for metricKey, metricBuffer := range allMetrics {
+               metricValues := metricBuffer.GetAllValues()
+               if len(metricValues) == 0 {
+                       continue
+               }
+
+               parsedKey, parseErr := c.parseMetricKey(metricKey)
+               if parseErr != nil {
+                       c.logger.Warn().Err(parseErr).Str("metric_key", 
metricKey).Msg("Failed to parse metric key")
+                       continue
+               }
+
+               description := descriptions[parsedKey.Name]
+
+               minLen := len(metricValues)
+               if len(timestampValues) < minLen {
+                       minLen = len(timestampValues)
+               }
+
+               labelsMap := make(map[string]string)
+               for _, label := range parsedKey.Labels {
+                       labelsMap[label.Name] = label.Value
+               }
+
+               for idx := 0; idx < minLen; idx++ {
+                       timestampUnix := timestampValues[idx]
+                       timestamp := time.Unix(timestampUnix, 0)
+
+                       if filter.StartTime != nil && 
timestamp.Before(*filter.StartTime) {
+                               continue
+                       }
+                       if filter.EndTime != nil && 
timestamp.After(*filter.EndTime) {
+                               continue
+                       }
+
+                       protoMetric := &fodcv1.Metric{
+                               Name:        parsedKey.Name,
+                               Labels:      labelsMap,
+                               Value:       metricValues[idx],
+                               Description: description,
+                               Timestamp:   timestamppb.New(timestamp),
+                       }
+
+                       protoMetrics = append(protoMetrics, protoMetric)
+               }
+       }
+
+       req := &fodcv1.StreamMetricsRequest{
+               Metrics:   protoMetrics,
+               Timestamp: timestamppb.Now(),
+       }
+
+       if sendErr := stream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send metrics: %w", sendErr)
+       }
+
+       return nil
+}
+
+// SendHeartbeat sends heartbeat to Proxy.
+func (c *Client) SendHeartbeat(_ context.Context) error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.registrationStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("registration stream not established")
+       }
+       registrationStream := c.registrationStream
+       c.streamsMu.RUnlock()
+
+       req := &fodcv1.RegisterAgentRequest{
+               NodeRole: c.nodeRole,
+               Labels:   c.labels,
+               PrimaryAddress: &fodcv1.Address{
+                       Ip:   c.nodeIP,
+                       Port: int32(c.nodePort),
+               },
+       }
+
+       if sendErr := registrationStream.Send(req); sendErr != nil {
+               // Check if error is due to stream being closed/disconnected
+               if errors.Is(sendErr, io.EOF) || errors.Is(sendErr, 
context.Canceled) {
+                       return fmt.Errorf("registration stream closed")
+               }
+               if st, ok := status.FromError(sendErr); ok {
+                       if st.Code() == codes.Canceled {
+                               return fmt.Errorf("registration stream closed")
+                       }
+               }
+               return fmt.Errorf("failed to send heartbeat: %w", sendErr)
+       }
+
+       return nil
+}
+
+// Disconnect closes connection to Proxy.
+func (c *Client) Disconnect() error {
+       c.streamsMu.Lock()
+       if c.disconnected {
+               c.streamsMu.Unlock()
+               return nil
+       }
+
+       c.disconnected = true
+       close(c.stopCh)
+
+       if c.heartbeatTicker != nil {
+               c.heartbeatTicker.Stop()
+               c.heartbeatTicker = nil
+       }
+       c.streamsMu.Unlock()
+
+       // Wait for heartbeat goroutine to exit before closing streams
+       c.heartbeatWg.Wait()
+
+       c.streamsMu.Lock()
+       if c.registrationStream != nil {
+               if closeErr := c.registrationStream.CloseSend(); closeErr != 
nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
registration stream")
+               }
+               c.registrationStream = nil
+       }
+
+       if c.metricsStream != nil {
+               if closeErr := c.metricsStream.CloseSend(); closeErr != nil {
+                       c.logger.Warn().Err(closeErr).Msg("Error closing 
metrics stream")
+               }
+               c.metricsStream = nil
+       }
+       c.streamsMu.Unlock()
+
+       c.connManager.Stop()
+
+       c.streamsMu.Lock()
+       c.client = nil
+       c.streamsMu.Unlock()
+
+       c.logger.Info().Msg("Disconnected from FODC Proxy")
+
+       return nil
+}
+
+// Start starts the proxy client with automatic reconnection.
+func (c *Client) Start(ctx context.Context) error {
+       c.connManager.Start(ctx)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               default:
+               }
+
+               if connectErr := c.Connect(ctx); connectErr != nil {
+                       c.logger.Error().Err(connectErr).Msg("Failed to connect 
to Proxy, retrying...")
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if regErr := c.StartRegistrationStream(ctx); regErr != nil {
+                       c.logger.Error().Err(regErr).Msg("Failed to start 
registration stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               if metricsErr := c.StartMetricsStream(ctx); metricsErr != nil {
+                       c.logger.Error().Err(metricsErr).Msg("Failed to start 
metrics stream, reconnecting...")
+                       if disconnectErr := c.Disconnect(); disconnectErr != 
nil {
+                               c.logger.Warn().Err(disconnectErr).Msg("Failed 
to disconnect before retry")
+                       }
+                       time.Sleep(c.reconnectInterval)
+                       continue
+               }
+
+               c.logger.Info().Msg("Proxy client started successfully")
+
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               case <-c.stopCh:
+                       return nil
+               }
+       }
+}
+
+// handleRegistrationStream handles the registration stream.
+func (c *Client) handleRegistrationStream(ctx context.Context, stream 
fodcv1.FODCService_RegisterAgentClient) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.stopCh:
+                       return
+               default:
+               }
+
+               _, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Registration stream closed by 
Proxy, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if recvErr != nil {
+                       // Check if it's a context cancellation or deadline 
exceeded (expected errors during cleanup)
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               c.logger.Debug().Err(recvErr).Msg("Registration 
stream closed")
+                               return
+                       }
+                       if st, ok := status.FromError(recvErr); ok {
+                               // Check if it's a gRPC status error with 
expected codes
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
c.logger.Debug().Err(recvErr).Msg("Registration stream closed")
+                                       return
+                               }
+                       }
+                       c.logger.Error().Err(recvErr).Msg("Error receiving from 
registration stream, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+       }
+}
+
+// handleMetricsStream handles the metrics stream.
+func (c *Client) handleMetricsStream(ctx context.Context, stream 
fodcv1.FODCService_StreamMetricsClient) {
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.stopCh:
+                       return
+               default:
+               }
+
+               resp, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Metrics stream closed by Proxy, 
reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if recvErr != nil {
+                       // Check if it's a context cancellation or deadline 
exceeded (expected errors during cleanup)
+                       if errors.Is(recvErr, context.Canceled) || 
errors.Is(recvErr, context.DeadlineExceeded) {
+                               c.logger.Debug().Err(recvErr).Msg("Metrics 
stream closed")
+                               return
+                       }
+                       if st, ok := status.FromError(recvErr); ok {
+                               // Check if it's a gRPC status error with 
expected codes
+                               code := st.Code()
+                               if code == codes.Canceled || code == 
codes.DeadlineExceeded {
+                                       
c.logger.Debug().Err(recvErr).Msg("Metrics stream closed")
+                                       return
+                               }
+                       }
+                       c.logger.Error().Err(recvErr).Msg("Error receiving from 
metrics stream, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+
+               filter := &MetricsRequestFilter{}
+               if resp.StartTime != nil {
+                       startTime := resp.StartTime.AsTime()
+                       filter.StartTime = &startTime
+               }
+               if resp.EndTime != nil {
+                       endTime := resp.EndTime.AsTime()
+                       filter.EndTime = &endTime
+               }
+
+               if retrieveErr := c.RetrieveAndSendMetrics(ctx, filter); 
retrieveErr != nil {
+                       c.logger.Error().Err(retrieveErr).Msg("Failed to 
retrieve and send metrics")
+               }
+       }
+}
+
+// reconnect handles automatic reconnection when streams break.
+func (c *Client) reconnect(ctx context.Context) {
+       c.streamsMu.Lock()
+       if c.disconnected {
+               c.streamsMu.Unlock()
+               c.logger.Warn().Msg("Already disconnected intentionally, 
skipping reconnection...")
+               return
+       }
+
+       c.logger.Info().Msg("Starting reconnection process...")
+
+       if c.heartbeatTicker != nil {
+               c.heartbeatTicker.Stop()
+               c.heartbeatTicker = nil
+       }
+       if c.registrationStream != nil {
+               _ = c.registrationStream.CloseSend()
+               c.registrationStream = nil
+       }
+       if c.metricsStream != nil {
+               _ = c.metricsStream.CloseSend()
+               c.metricsStream = nil
+       }
+       c.streamsMu.Unlock()
+
+       reconnectCh := c.connManager.RequestReconnect(ctx)
+       reconnectResult := <-reconnectCh

Review Comment:
   If the connection is the original one, do not recreate the client. 



##########
fodc/agent/internal/proxy/conn_manager.go:
##########
@@ -0,0 +1,361 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package proxy provides connection management for the FODC Proxy client.
+package proxy
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const (
+       connManagerMaxRetryInterval = 30 * time.Second
+       connManagerMaxRetries       = 3
+)
+
+// ConnEventType represents the type of connection event.
+type ConnEventType int
+
+// Possible connection event types.
+const (
+       ConnEventConnect ConnEventType = iota
+       ConnEventDisconnect
+)
+
+// ConnEvent represents a connection event sent to the manager.
+type ConnEvent struct {
+       ResultCh  chan<- ConnResult
+       Context   context.Context
+       Type      ConnEventType
+       Immediate bool // If true, skip backoff and retry immediately
+}
+
+// ConnResult represents the result of a connection operation.
+type ConnResult struct {
+       Conn  *grpc.ClientConn
+       Error error
+}
+
+// ConnState represents the state of the connection.
+type ConnState int
+
+const (
+       // ConnStateDisconnected indicates the connection is disconnected.
+       ConnStateDisconnected ConnState = iota
+       // ConnStateConnected indicates the connection is established.
+       ConnStateConnected
+)
+
+// HeartbeatChecker is a function that checks if the connection is healthy.
+type HeartbeatChecker func(context.Context) error
+
+// ConnManager manages connection lifecycle using a single goroutine.
+type ConnManager struct {
+       logger            *logger.Logger
+       currentConn       *grpc.ClientConn
+       closer            *run.Closer
+       heartbeatChecker  HeartbeatChecker
+       eventCh           chan ConnEvent
+       proxyAddr         string
+       reconnectInterval time.Duration
+       retryInterval     time.Duration
+       state             ConnState
+       stateMu           sync.RWMutex
+       startOnce         sync.Once
+}
+
+// NewConnManager creates a new connection manager.
+func NewConnManager(
+       proxyAddr string,
+       reconnectInterval time.Duration,
+       logger *logger.Logger,
+) *ConnManager {
+       return &ConnManager{
+               eventCh:           make(chan ConnEvent, 10),
+               closer:            run.NewCloser(1),
+               logger:            logger,
+               proxyAddr:         proxyAddr,
+               reconnectInterval: reconnectInterval,
+               state:             ConnStateDisconnected,
+               retryInterval:     reconnectInterval,
+               heartbeatChecker:  nil,
+       }
+}
+
+// SetHeartbeatChecker sets the heartbeat checker function.
+func (cm *ConnManager) SetHeartbeatChecker(checker HeartbeatChecker) {
+       cm.stateMu.Lock()
+       cm.heartbeatChecker = checker
+       cm.stateMu.Unlock()
+}
+
+// EventChannel returns the channel for sending connect/reconnect events.
+func (cm *ConnManager) EventChannel() chan<- ConnEvent {
+       return cm.eventCh
+}
+
+// Start starts the connection manager's event processing goroutine.
+func (cm *ConnManager) Start(ctx context.Context) {
+       cm.startOnce.Do(func() {
+               go cm.run(ctx)
+       })
+}
+
+// Stop stops the connection manager and closes all connections.
+func (cm *ConnManager) Stop() {
+       cm.closer.CloseThenWait()
+}
+
+// requestConnection requests a connection attempt with optional heartbeat 
check.
+func (cm *ConnManager) requestConnection(ctx context.Context, immediate bool) 
<-chan ConnResult {
+       resultCh := make(chan ConnResult, 1)
+       event := ConnEvent{
+               Type:      ConnEventConnect,
+               Context:   ctx,
+               ResultCh:  resultCh,
+               Immediate: immediate,
+       }
+       select {
+       case cm.eventCh <- event:
+       case <-ctx.Done():
+               resultCh <- ConnResult{Error: ctx.Err()}
+               close(resultCh)
+       default:
+               resultCh <- ConnResult{Error: fmt.Errorf("connection manager 
event channel is full")}
+               close(resultCh)
+       }
+       return resultCh
+}
+
+// RequestConnect requests a connection attempt.
+func (cm *ConnManager) RequestConnect(ctx context.Context) <-chan ConnResult {
+       return cm.requestConnection(ctx, true)
+}
+
+// RequestReconnect requests a reconnection attempt with exponential backoff.
+func (cm *ConnManager) RequestReconnect(ctx context.Context) <-chan ConnResult 
{
+       return cm.requestConnection(ctx, false)
+}
+
+// run is the main event processing loop running in a single goroutine.
+func (cm *ConnManager) run(ctx context.Context) {
+       defer cm.closer.Done()
+       for {
+               select {
+               case <-ctx.Done():
+                       cm.cleanup()
+                       return
+               case <-cm.closer.CloseNotify():
+                       cm.cleanup()
+                       return
+               case event := <-cm.eventCh:
+                       cm.handleEvent(ctx, event)
+               }
+       }
+}
+
+// handleEvent runs in the single goroutine and processes a connection event.
+func (cm *ConnManager) handleEvent(ctx context.Context, event ConnEvent) {
+       switch event.Type {
+       case ConnEventDisconnect:
+               cm.stateMu.Lock()
+               cm.state = ConnStateDisconnected
+               cm.stateMu.Unlock()
+
+               cm.cleanupConnection()
+               if event.ResultCh != nil {
+                       event.ResultCh <- ConnResult{Error: nil}
+                       close(event.ResultCh)
+               }
+       case ConnEventConnect:
+               cm.stateMu.RLock()
+               currentState := cm.state
+               currentConn := cm.currentConn
+               heartbeatChecker := cm.heartbeatChecker
+               cm.stateMu.RUnlock()
+
+               if currentState == ConnStateConnected && currentConn != nil && 
heartbeatChecker != nil {
+                       checkCtx, checkCancel := context.WithTimeout(ctx, 
5*time.Second)
+                       heartbeatErr := heartbeatChecker(checkCtx)
+                       checkCancel()
+
+                       if heartbeatErr == nil {
+                               cm.logger.Debug().Msg("Connection is healthy, 
reusing existing connection")
+                               if event.ResultCh != nil {
+                                       event.ResultCh <- ConnResult{Conn: 
currentConn}
+                                       close(event.ResultCh)
+                               }
+                               return
+                       }
+                       cm.logger.Warn().Err(heartbeatErr).Msg("Heartbeat check 
failed, reconnecting")
+               }
+               // For reconnect cleanup old connection first
+               if currentState == ConnStateConnected {
+                       cm.cleanupConnection()
+               }
+
+               connCtx, connCancel := context.WithCancel(ctx)
+               defer connCancel()
+               if event.Context != nil {
+                       go func() {
+                               select {
+                               case <-event.Context.Done():
+                                       connCancel()
+                               case <-connCtx.Done():
+                               }
+                       }()
+               }
+
+               var result ConnResult
+               if !event.Immediate {
+                       result = cm.doReconnect(connCtx)
+               } else {
+                       result = cm.doConnect(connCtx)
+               }
+
+               cm.stateMu.Lock()
+               if result.Error == nil {
+                       cm.state = ConnStateConnected
+                       cm.currentConn = result.Conn
+                       cm.retryInterval = cm.reconnectInterval
+               } else {
+                       cm.state = ConnStateDisconnected
+               }
+               cm.stateMu.Unlock()
+
+               if event.ResultCh != nil {
+                       event.ResultCh <- result
+                       close(event.ResultCh)
+               }
+       }
+}
+
+// doConnect performs the actual connection attempt.
+func (cm *ConnManager) doConnect(ctx context.Context) ConnResult {

Review Comment:
   Merge doConnect and doReconnect into a single doConnect function that 
supports retry. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to