wu-sheng commented on code in PR #911:
URL:
https://github.com/apache/skywalking-banyandb/pull/911#discussion_r2640335453
##########
docs/design/fodc/proxy.md:
##########
@@ -0,0 +1,941 @@
+# FODC Proxy Development Design
+
+## Overview
+
+The FODC Proxy is the central control plane and data aggregator for the First
Occurrence Data Collection (FODC) infrastructure. It acts as a unified gateway
that aggregates observability data from multiple FODC Agents (each co-located
with a BanyanDB node) and exposes ecosystem-friendly interfaces to external
systems such as Prometheus and other observability platforms.
+
+The Proxy provides:
+
+1. **Agent Management**: Registration, health monitoring, and lifecycle
management of FODC Agents
+2. **Metrics Aggregation**: Collects and aggregates metrics from all agents
with enriched metadata
+3. **Cluster Topology**: Maintains an up-to-date view of cluster topology,
roles, and node states
+4. **Configuration Collection**: Aggregates and exposes node configurations
for consistency verification
+
+### Responsibilities
+
+**FODC Proxy Core Responsibilities**
+- Accept bi-directional gRPC connections from FODC Agents
+- Register and track agent lifecycle (online/offline, heartbeat monitoring)
+- Aggregate metrics from all agents with node metadata enrichment
+- Maintain cluster topology view based on agent registrations
+- Collect and expose node configurations for audit and consistency checks
+- Expose unified REST/Prometheus-style interfaces for external consumption
+- Provide proxy-level metrics (health, agent count, RPC latency, etc.)
+
+## Component Design
+
+### 1. Proxy Components
+
+#### 1.1 Agent Registry Component
+
+**Purpose**: Manages the lifecycle and state of all connected FODC Agents
+
+##### Core Responsibilities
+
+- **Agent Registration**: Accepts agent registration requests via gRPC
+- **Health Monitoring**: Tracks agent heartbeat and connection status
+- **State Management**: Maintains agent state (online/offline, last heartbeat
time)
+- **Topology Building**: Aggregates agent registrations into cluster topology
view
+- **Connection Management**: Handles connection failures, reconnections, and
cleanup
+
+##### Core Types
+
+**`AgentInfo`**
+```go
+type AgentInfo struct {
+ NodeID string // Unique node identifier
+ NodeRole databasev1.Role // Node role (liaison,
datanode-hot, etc.)
+ PrimaryAddress Address // Primary agent gRPC address
+ SecondaryAddresses map[string]Address // Secondary addresses with
names (e.g., "metrics": Address, "gossip": Address)
+ Labels map[string]string // Node labels/metadata
+ RegisteredAt time.Time // Registration timestamp
+ LastHeartbeat time.Time // Last heartbeat timestamp
+ Status AgentStatus // Current agent status
+}
+
+type Address struct {
+ IP string
+ Port int
+}
+
+type AgentStatus string
+
+const (
+ AgentStatusOnline AgentStatus = "online"
+ AgentStatusOffline AgentStatus = "unconnected"
+)
+```
+
+**`AgentRegistry`**
+```go
+type AgentRegistry struct {
+ agents map[AgentKey]*AgentInfo // Map from agent key
(IP+port+role+labels) to agent info
+ mu sync.RWMutex // Protects agents map
+ logger *logger.Logger
+ heartbeatTimeout time.Duration // Timeout for considering agent
offline
+}
+
+type AgentKey struct {
+ IP string // Primary IP address
+ Port int // Primary port
+ Role databasev1.Role // Node role
+ Labels map[string]string // Node labels (used for key matching)
+}
+```
+
+##### Key Functions
+
+**`RegisterAgent(ctx context.Context, info *AgentInfo) error`**
+- Registers a new agent or updates existing agent information
+- Creates AgentKey from primary IP + port + role + labels
+- Uses AgentKey as the map key (not nodeID)
+- Validates primary address and role
+- Updates topology view
+- Returns error if registration fails
+
+**`UnregisterAgent(key AgentKey) error`**
+- Removes agent from registry using AgentKey
+- Cleans up associated resources
+- Updates topology view
+- Called in the following scenarios:
+ - When agent's registration stream closes (connection lost)
+ - When agent's all streams are closed and connection is terminated
+ - When agent has been offline longer than `--agent-cleanup-timeout`
(detected by heartbeat health check)
+ - During graceful shutdown or manual agent removal
+ - When agent explicitly requests unregistration via stream
+
+**`UpdateHeartbeat(key AgentKey) error`**
+- Updates last heartbeat timestamp for agent using AgentKey
+- Marks agent as online if it was offline
+- Returns error if agent not found
+
+**`GetAgent(ip string, port int, role databasev1.Role, labels
map[string]string) (*AgentInfo, error)`**
+- Retrieves agent information by primary IP + port + role + labels
+- Creates AgentKey from the provided parameters
+- Looks up agent in registry using AgentKey
+- Returns error if agent not found
+
+**`ListAgents() []*AgentInfo`**
+- Returns list of all registered agents
+- Thread-safe read operation
+
+**`ListAgentsByRole(role databasev1.Role) []*AgentInfo`**
+- Returns agents filtered by role
+- Useful for role-specific operations
+
+**`CheckAgentHealth() error`**
+- Periodically checks agent health based on heartbeat timeout
+- Marks agents as offline if heartbeat timeout exceeded
+- Continuously runs heartbeat health checks regardless of cleanup timeout
+- Unregisters agents that have been offline longer than
`--agent-cleanup-timeout`
+- Agents that cannot maintain connection will be removed after the cleanup
timeout period
+- Returns aggregated health status
+
+##### Configuration Flags
+
+**`--agent-heartbeat-timeout`**
+- **Type**: `duration`
+- **Default**: `30s`
+- **Description**: Timeout for considering an agent offline if no heartbeat
received
+
+**`--max-agents`**
+- **Type**: `int`
+- **Default**: `1000`
+- **Description**: Maximum number of agents that can be registered
+
+**`--agent-cleanup-timeout`**
+- **Type**: `duration`
+- **Default**: `5m`
+- **Minimum**: Must be greater than `--agent-heartbeat-timeout`
+- **Description**: Timeout for automatically unregistering agents that have
been offline. Agents that cannot maintain connection will be removed after
being offline longer than this timeout. The heartbeat health check continues
running regardless of this timeout. This timeout must be greater than
`--agent-heartbeat-timeout` to allow for proper health checking.
+
+#### 1.2 gRPC Server Component
+
+**Purpose**: Handles bi-directional gRPC communication with FODC Agents
+
+##### Core Responsibilities
+
+- **Agent Connection Handling**: Accepts and manages gRPC connections from
agents
+- **Streaming Support**: Supports bi-directional streaming for metrics
+- **Protocol Implementation**: Implements FODC gRPC service protocol
+- **Connection Lifecycle**: Manages connection establishment, maintenance, and
cleanup
+
+##### Core Types
+
+**`FODCService`** (gRPC Service Implementation)
+```go
+type FODCService struct {
+ registry *AgentRegistry
+ metricsAggregator *MetricsAggregator
+ logger *logger.Logger
+}
+
+// Example gRPC service methods (to be defined in proto)
+// RegisterAgent(stream RegisterAgentRequest) (stream RegisterAgentResponse)
error
+// StreamMetrics(stream StreamMetricsRequest) (stream StreamMetricsResponse)
error
+```
+
+**`AgentConnection`**
+```go
+type AgentConnection struct {
+ Key AgentKey // Agent key (IP+port+role+labels)
for registry lookup
+ NodeID string
+ Stream grpc.ServerStream
+ Context context.Context
+ Cancel context.CancelFunc
+ LastActivity time.Time
+}
+```
+
+##### Key Functions
+
+**`RegisterAgent(stream FODCService_RegisterAgentServer) error`**
+- Handles bi-directional agent registration stream
+- Receives registration requests from agent (includes primary address, role,
labels)
+- Creates AgentKey from primary IP + port + role + labels
+- Validates registration information
+- Registers agent with AgentRegistry using AgentKey
+- Sends registration responses
+- Maintains stream for heartbeat and re-registration
+
+**`StreamMetrics(stream FODCService_StreamMetricsServer) error`**
+- Handles bi-directional metrics streaming
+- Sends metrics requests from Proxy to agent (on-demand collection)
+- Receives metrics from agent at Proxy in response to requests
+- Proxy initiates by sending a metrics request via StreamMetricsResponse when
an external client queries metrics
+- Agent responds with StreamMetricsRequest containing the collected metrics
+- Manages stream lifecycle
+
+##### Connection Lifecycle Management
+
+**Stream Closure Handling**
+- When a stream closes (due to network error, agent shutdown, or timeout), the
gRPC server should:
+ 1. Detect stream closure via context cancellation or stream error
+ 2. Extract agent key (primary IP + port + role + labels) from the connection
+ 3. Check if this is the last active stream for the agent
+ 4. If all streams are closed, call `AgentRegistry.UnregisterAgent(agentKey)`
to clean up
+ 5. Update topology to reflect agent offline status
+
+**Graceful vs. Ungraceful Disconnection**
+- **Graceful**: Agent sends explicit disconnect message before closing stream
→ immediate unregistration
+- **Ungraceful**: Stream closes unexpectedly → unregistration happens after
detecting all streams closed
+- **Heartbeat Timeout**: Agent marked offline by `CheckAgentHealth()` →
unregistration occurs after being offline longer than
`--agent-cleanup-timeout`. Heartbeat health checks continue running regardless.
+
+##### Configuration Flags
+
+**`--grpc-listen-addr`**
+- **Type**: `string`
+- **Default**: `:17900`
+- **Description**: gRPC server address where the Proxy listens for agent
connections
+
+**`--grpc-max-msg-size`**
+- **Type**: `int`
+- **Default**: `4194304` (4MB)
+- **Description**: Maximum message size for gRPC messages
+
+#### 1.3 Metrics Aggregator Component
+
+**Purpose**: Aggregates and enriches metrics from all agents
+
+##### Core Responsibilities
+
+- **On-Demand Metrics Collection**: Requests metrics from agents via gRPC
streams when external clients query metrics
+- **Metrics Request Coordination**: Coordinates metrics requests to multiple
agents concurrently
+- **Metadata Enrichment**: Adds node metadata (role, ID, labels) to metrics
+- **Normalization**: Normalizes metric formats and labels across agents
+- **Time Window Filtering**: Filters metrics by time window when requested by
external clients (agents filter based on Flight Recorder data)
+
+##### Core Types
+
+**`AggregatedMetric`**
+```go
+type AggregatedMetric struct {
+ Name string // Metric name
+ Labels map[string]string // Metric labels (including node
metadata)
+ Value float64 // Metric value
+ Timestamp time.Time // Metric timestamp
+ NodeID string // Source node ID
+ NodeRole databasev1.Role // Source node role
+ Description string // Metric description/HELP text
+}
+```
+
+**`MetricsAggregator`**
+```go
+type MetricsAggregator struct {
+ registry *AgentRegistry
+ grpcService *FODCService // For requesting metrics from agents
+ mu sync.RWMutex
+ logger *logger.Logger
+}
+
+type MetricsFilter struct {
+ NodeIDs []string // Filter by specific node IDs (empty =
all nodes)
+ Role databasev1.Role // Filter by node role (optional)
+ StartTime *time.Time // Start time for time window (optional)
+ EndTime *time.Time // End time for time window (optional)
+}
+```
+
+##### Key Functions
+
+**`CollectMetricsFromAgents(ctx context.Context, filter *MetricsFilter)
([]*AggregatedMetric, error)`**
+- Requests metrics from all agents (or filtered agents) when external client
queries
+- Sends metrics request via StreamMetrics() to each agent with time window
filter (if specified)
+- Agents retrieve metrics from Flight Recorder (in-memory storage) filtered by
time window and respond
+- Waits for StreamMetricsRequest responses from agents (with timeout)
+- Identifies agent connection from stream context and looks up AgentKey
+- Enriches metrics with node metadata (IP, port, role, labels) from AgentKey
+- Combines metrics from all agents into a single list
+- Returns aggregated metrics (not stored, only returned)
+- Returns error if collection fails
+
+**`GetLatestMetrics(ctx context.Context) ([]*AggregatedMetric, error)`**
+- Triggers on-demand collection from all agents
+- Calls CollectMetricsFromAgents() with no time filter to get current metrics
+- Returns latest metrics from all agents
+- Used for `/metrics-windows` endpoint without time parameters
+- Returns error if collection fails
+
+**`GetMetricsWindow(ctx context.Context, startTime, endTime time.Time, filter
*MetricsFilter) ([]*AggregatedMetric, error)`**
+- Triggers on-demand collection from all agents
+- Calls CollectMetricsFromAgents() with time window filter
+- Agents filter metrics from Flight Recorder by the specified time range
+- Returns metrics within specified time range
+- Used for `/metrics-windows` endpoint with time parameters
+- Returns error if collection fails
+
+##### Configuration Flags
+
+*Note: No configuration flags needed for MetricsAggregator since metrics are
collected on-demand and not stored.*
+
+#### 1.4 HTTP/REST API Server Component
+
+**Purpose**: Exposes REST and Prometheus-style endpoints for external
consumption
+
+##### Core Responsibilities
+
+- **REST API**: Implements REST endpoints for cluster topology and
configuration
+- **Prometheus Integration**: Exposes Prometheus-compatible metrics endpoints
+- **Request Handling**: Handles HTTP requests and routes to appropriate
handlers
+- **Response Formatting**: Formats responses in appropriate formats (JSON,
Prometheus text)
+
+##### API Endpoints
+
+**`GET /metrics`**
+- Returns latest metrics from all agents (on-demand collection, not stored in
Proxy)
+- Includes node metadata (role, ID, labels)
+- Format: Prometheus text format
+- Query parameters:
+ - `node_id` (optional): Filter by node ID
+ - `role` (optional): Filter by role (liaison, datanode-hot, etc.)
+- Used by: Prometheus scrapers, monitoring systems
+
+**`GET /metrics-windows`**
+- Returns metrics from all agents (on-demand collection, not stored in Proxy)
+- Includes node metadata (role, ID, labels)
+- Format: Prometheus text format
+- Query parameters:
+ - `start_time`: Start time for time window (optional) - filters metrics by
start time (agents filter from Flight Recorder)
+ - `end_time`: End time for time window (optional) - filters metrics by end
time (agents filter from Flight Recorder)
+ - `node_id`: Filter by node ID (optional)
+ - `role`: Filter by node role (optional)
+
+**`GET /cluster`**
+- Returns cluster topology and node status
+- Format: JSON
+- Response includes:
+ - List of all registered nodes
+ - Node identity (ID, name, address)
+ - Node role
+ - Node labels
+ - Agent status (online/offline, last heartbeat)
+ - Node relationships
+- Note: Node information is obtained from agent registration data stored in
AgentRegistry
+
+**`GET /cluster/config`**
+- Returns node configurations
+- Format: JSON
+- Query parameters:
+ - `node_id`: Filter by node ID (optional)
+ - `role`: Filter by node role (optional)
+
+**`GET /health`**
+- Health check endpoint
+- Format: JSON
+- Returns proxy health status
+
+##### Core Types
+
+**`APIServer`**
+```go
+type APIServer struct {
+ metricsAggregator *MetricsAggregator
+ server *http.Server
+ logger *logger.Logger
+}
+```
+
+##### Key Functions
+
+**`Start(listenAddr string) error`**
+- Starts HTTP server
+- Registers all API endpoints
+- Returns error if start fails
+
+**`Stop() error`**
+- Gracefully stops HTTP server
+- Waits for in-flight requests
+- Returns error if stop fails
+
+##### Configuration Flags
+
+**`--http-listen-addr`**
+- **Type**: `string`
+- **Default**: `:17901`
+- **Description**: HTTP server listen address
+
+**`--http-read-timeout`**
+- **Type**: `duration`
+- **Default**: `10s`
+- **Description**: HTTP read timeout
+
+**`--http-write-timeout`**
+- **Type**: `duration`
+- **Default**: `10s`
+- **Description**: HTTP write timeout
+
+### 2. Agent Components
+
+**Purpose**: Components that run within FODC Agents to communicate with the
Proxy
+
+#### 2.1 Proxy Client Component
+
+**Purpose**: Manages connection and communication with the FODC Proxy
+
+##### Core Responsibilities
+
+- **Connection Management**: Establishes and maintains gRPC connection to Proxy
+- **Registration**: Registers agent with Proxy on startup
+- **Heartbeat Management**: Sends periodic heartbeats to maintain connection
+- **Stream Management**: Manages bi-directional streams for metrics
+- **Reconnection Logic**: Handles connection failures and automatic
reconnection
+
+##### Core Types
+
+**`ProxyClient`**
+```go
+type ProxyClient struct {
+ proxyAddr string
+ nodeIP string
+ nodePort int
Review Comment:
These two fields are not formatted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]