wu-sheng commented on code in PR #911:
URL:
https://github.com/apache/skywalking-banyandb/pull/911#discussion_r2640333835
##########
docs/design/fodc/proxy.md:
##########
@@ -0,0 +1,941 @@
+# FODC Proxy Development Design
+
+## Overview
+
+The FODC Proxy is the central control plane and data aggregator for the First
Occurrence Data Collection (FODC) infrastructure. It acts as a unified gateway
that aggregates observability data from multiple FODC Agents (each co-located
with a BanyanDB node) and exposes ecosystem-friendly interfaces to external
systems such as Prometheus and other observability platforms.
+
+The Proxy provides:
+
+1. **Agent Management**: Registration, health monitoring, and lifecycle
management of FODC Agents
+2. **Metrics Aggregation**: Collects and aggregates metrics from all agents
with enriched metadata
+3. **Cluster Topology**: Maintains an up-to-date view of cluster topology,
roles, and node states
+4. **Configuration Collection**: Aggregates and exposes node configurations
for consistency verification
+
+### Responsibilities
+
+**FODC Proxy Core Responsibilities**
+- Accept bi-directional gRPC connections from FODC Agents
+- Register and track agent lifecycle (online/offline, heartbeat monitoring)
+- Aggregate metrics from all agents with node metadata enrichment
+- Maintain cluster topology view based on agent registrations
+- Collect and expose node configurations for audit and consistency checks
+- Expose unified REST/Prometheus-style interfaces for external consumption
+- Provide proxy-level metrics (health, agent count, RPC latency, etc.)
+
+## Component Design
+
+### 1. Proxy Components
+
+#### 1.1 Agent Registry Component
+
+**Purpose**: Manages the lifecycle and state of all connected FODC Agents
+
+##### Core Responsibilities
+
+- **Agent Registration**: Accepts agent registration requests via gRPC
+- **Health Monitoring**: Tracks agent heartbeat and connection status
+- **State Management**: Maintains agent state (online/offline, last heartbeat
time)
+- **Topology Building**: Aggregates agent registrations into cluster topology
view
+- **Connection Management**: Handles connection failures, reconnections, and
cleanup
+
+##### Core Types
+
+**`AgentInfo`**
+```go
+type AgentInfo struct {
+ NodeID string // Unique node identifier
+ NodeRole databasev1.Role // Node role (liaison,
datanode-hot, etc.)
+ PrimaryAddress Address // Primary agent gRPC address
+ SecondaryAddresses map[string]Address // Secondary addresses with
names (e.g., "metrics": Address, "gossip": Address)
+ Labels map[string]string // Node labels/metadata
+ RegisteredAt time.Time // Registration timestamp
+ LastHeartbeat time.Time // Last heartbeat timestamp
+ Status AgentStatus // Current agent status
+}
+
+type Address struct {
+ IP string
+ Port int
+}
+
+type AgentStatus string
+
+const (
+ AgentStatusOnline AgentStatus = "online"
+ AgentStatusOffline AgentStatus = "unconnected"
+)
+```
+
+**`AgentRegistry`**
+```go
+type AgentRegistry struct {
+ agents map[AgentKey]*AgentInfo // Map from agent key
(IP+port+role+labels) to agent info
+ mu sync.RWMutex // Protects agents map
+ logger *logger.Logger
+ heartbeatTimeout time.Duration // Timeout for considering agent
offline
+}
+
+type AgentKey struct {
+ IP string // Primary IP address
+ Port int // Primary port
+ Role databasev1.Role // Node role
+ Labels map[string]string // Node labels (used for key matching)
+}
+```
+
+##### Key Functions
+
+**`RegisterAgent(ctx context.Context, info *AgentInfo) error`**
+- Registers a new agent or updates existing agent information
+- Creates AgentKey from primary IP + port + role + labels
+- Uses AgentKey as the map key (not nodeID)
+- Validates primary address and role
+- Updates topology view
+- Returns error if registration fails
+
+**`UnregisterAgent(key AgentKey) error`**
+- Removes agent from registry using AgentKey
+- Cleans up associated resources
+- Updates topology view
+- Called in the following scenarios:
+ - When agent's registration stream closes (connection lost)
+ - When agent's all streams are closed and connection is terminated
+ - When agent has been offline longer than `--agent-cleanup-timeout`
(detected by heartbeat health check)
+ - During graceful shutdown or manual agent removal
+ - When agent explicitly requests unregistration via stream
+
+**`UpdateHeartbeat(key AgentKey) error`**
+- Updates last heartbeat timestamp for agent using AgentKey
+- Marks agent as online if it was offline
+- Returns error if agent not found
+
+**`GetAgent(ip string, port int, role databasev1.Role, labels
map[string]string) (*AgentInfo, error)`**
+- Retrieves agent information by primary IP + port + role + labels
+- Creates AgentKey from the provided parameters
+- Looks up agent in registry using AgentKey
+- Returns error if agent not found
+
+**`ListAgents() []*AgentInfo`**
+- Returns list of all registered agents
+- Thread-safe read operation
+
+**`ListAgentsByRole(role databasev1.Role) []*AgentInfo`**
+- Returns agents filtered by role
+- Useful for role-specific operations
+
+**`CheckAgentHealth() error`**
+- Periodically checks agent health based on heartbeat timeout
+- Marks agents as offline if heartbeat timeout exceeded
+- Continuously runs heartbeat health checks regardless of cleanup timeout
+- Unregisters agents that have been offline longer than
`--agent-cleanup-timeout`
+- Agents that cannot maintain connection will be removed after the cleanup
timeout period
+- Returns aggregated health status
+
+##### Configuration Flags
+
+**`--agent-heartbeat-timeout`**
+- **Type**: `duration`
+- **Default**: `30s`
+- **Description**: Timeout for considering an agent offline if no heartbeat
received
+
+**`--max-agents`**
+- **Type**: `int`
+- **Default**: `1000`
+- **Description**: Maximum number of agents that can be registered
+
+**`--agent-cleanup-timeout`**
+- **Type**: `duration`
+- **Default**: `5m`
+- **Minimum**: Must be greater than `--agent-heartbeat-timeout`
+- **Description**: Timeout for automatically unregistering agents that have
been offline. Agents that cannot maintain connection will be removed after
being offline longer than this timeout. The heartbeat health check continues
running regardless of this timeout. This timeout must be greater than
`--agent-heartbeat-timeout` to allow for proper health checking.
+
+#### 1.2 gRPC Server Component
+
+**Purpose**: Handles bi-directional gRPC communication with FODC Agents
+
+##### Core Responsibilities
+
+- **Agent Connection Handling**: Accepts and manages gRPC connections from
agents
+- **Streaming Support**: Supports bi-directional streaming for metrics
+- **Protocol Implementation**: Implements FODC gRPC service protocol
+- **Connection Lifecycle**: Manages connection establishment, maintenance, and
cleanup
+
+##### Core Types
+
+**`FODCService`** (gRPC Service Implementation)
+```go
+type FODCService struct {
+ registry *AgentRegistry
+ metricsAggregator *MetricsAggregator
+ logger *logger.Logger
+}
+
+// Example gRPC service methods (to be defined in proto)
+// RegisterAgent(stream RegisterAgentRequest) (stream RegisterAgentResponse)
error
+// StreamMetrics(stream StreamMetricsRequest) (stream StreamMetricsResponse)
error
+```
+
+**`AgentConnection`**
+```go
+type AgentConnection struct {
+ Key AgentKey // Agent key (IP+port+role+labels)
for registry lookup
+ NodeID string
+ Stream grpc.ServerStream
+ Context context.Context
+ Cancel context.CancelFunc
+ LastActivity time.Time
+}
+```
+
+##### Key Functions
+
+**`RegisterAgent(stream FODCService_RegisterAgentServer) error`**
+- Handles bi-directional agent registration stream
+- Receives registration requests from agent (includes primary address, role,
labels)
+- Creates AgentKey from primary IP + port + role + labels
+- Validates registration information
+- Registers agent with AgentRegistry using AgentKey
+- Sends registration responses
+- Maintains stream for heartbeat and re-registration
+
+**`StreamMetrics(stream FODCService_StreamMetricsServer) error`**
+- Handles bi-directional metrics streaming
+- Sends metrics requests from Proxy to agent (on-demand collection)
+- Receives metrics from agent at Proxy in response to requests
+- Proxy initiates by sending a metrics request via StreamMetricsResponse when
an external client queries metrics
+- Agent responds with StreamMetricsRequest containing the collected metrics
+- Manages stream lifecycle
+
+##### Connection Lifecycle Management
+
+**Stream Closure Handling**
+- When a stream closes (due to network error, agent shutdown, or timeout), the
gRPC server should:
+ 1. Detect stream closure via context cancellation or stream error
+ 2. Extract agent key (primary IP + port + role + labels) from the connection
+ 3. Check if this is the last active stream for the agent
+ 4. If all streams are closed, call `AgentRegistry.UnregisterAgent(agentKey)`
to clean up
+ 5. Update topology to reflect agent offline status
+
+**Graceful vs. Ungraceful Disconnection**
+- **Graceful**: Agent sends explicit disconnect message before closing stream
→ immediate unregistration
+- **Ungraceful**: Stream closes unexpectedly → unregistration happens after
detecting all streams closed
+- **Heartbeat Timeout**: Agent marked offline by `CheckAgentHealth()` →
unregistration occurs after being offline longer than
`--agent-cleanup-timeout`. Heartbeat health checks continue running regardless.
+
+##### Configuration Flags
+
+**`--grpc-listen-addr`**
+- **Type**: `string`
+- **Default**: `:17900`
+- **Description**: gRPC server address where the Proxy listens for agent
connections
+
+**`--grpc-max-msg-size`**
+- **Type**: `int`
+- **Default**: `4194304` (4MB)
+- **Description**: Maximum message size for gRPC messages
+
+#### 1.3 Metrics Aggregator Component
+
+**Purpose**: Aggregates and enriches metrics from all agents
+
+##### Core Responsibilities
+
+- **On-Demand Metrics Collection**: Requests metrics from agents via gRPC
streams when external clients query metrics
+- **Metrics Request Coordination**: Coordinates metrics requests to multiple
agents concurrently
+- **Metadata Enrichment**: Adds node metadata (role, ID, labels) to metrics
+- **Normalization**: Normalizes metric formats and labels across agents
+- **Time Window Filtering**: Filters metrics by time window when requested by
external clients (agents filter based on Flight Recorder data)
+
+##### Core Types
+
+**`AggregatedMetric`**
+```go
+type AggregatedMetric struct {
+ Name string // Metric name
+ Labels map[string]string // Metric labels (including node
metadata)
+ Value float64 // Metric value
+ Timestamp time.Time // Metric timestamp
+ NodeID string // Source node ID
+ NodeRole databasev1.Role // Source node role
+ Description string // Metric description/HELP text
+}
+```
+
+**`MetricsAggregator`**
+```go
+type MetricsAggregator struct {
+ registry *AgentRegistry
+ grpcService *FODCService // For requesting metrics from agents
+ mu sync.RWMutex
+ logger *logger.Logger
+}
+
+type MetricsFilter struct {
+ NodeIDs []string // Filter by specific node IDs (empty =
all nodes)
+ Role databasev1.Role // Filter by node role (optional)
+ StartTime *time.Time // Start time for time window (optional)
+ EndTime *time.Time // End time for time window (optional)
+}
+```
+
+##### Key Functions
+
+**`CollectMetricsFromAgents(ctx context.Context, filter *MetricsFilter)
([]*AggregatedMetric, error)`**
+- Requests metrics from all agents (or filtered agents) when external client
queries
+- Sends metrics request via StreamMetrics() to each agent with time window
filter (if specified)
+- Agents retrieve metrics from Flight Recorder (in-memory storage) filtered by
time window and respond
+- Waits for StreamMetricsRequest responses from agents (with timeout)
+- Identifies agent connection from stream context and looks up AgentKey
+- Enriches metrics with node metadata (IP, port, role, labels) from AgentKey
+- Combines metrics from all agents into a single list
+- Returns aggregated metrics (not stored, only returned)
+- Returns error if collection fails
+
+**`GetLatestMetrics(ctx context.Context) ([]*AggregatedMetric, error)`**
+- Triggers on-demand collection from all agents
+- Calls CollectMetricsFromAgents() with no time filter to get current metrics
+- Returns latest metrics from all agents
+- Used for `/metrics-windows` endpoint without time parameters
+- Returns error if collection fails
+
+**`GetMetricsWindow(ctx context.Context, startTime, endTime time.Time, filter
*MetricsFilter) ([]*AggregatedMetric, error)`**
+- Triggers on-demand collection from all agents
+- Calls CollectMetricsFromAgents() with time window filter
+- Agents filter metrics from Flight Recorder by the specified time range
+- Returns metrics within specified time range
+- Used for `/metrics-windows` endpoint with time parameters
+- Returns error if collection fails
+
+##### Configuration Flags
+
+*Note: No configuration flags needed for MetricsAggregator since metrics are
collected on-demand and not stored.*
+
+#### 1.4 HTTP/REST API Server Component
+
+**Purpose**: Exposes REST and Prometheus-style endpoints for external
consumption
+
+##### Core Responsibilities
+
+- **REST API**: Implements REST endpoints for cluster topology and
configuration
+- **Prometheus Integration**: Exposes Prometheus-compatible metrics endpoints
+- **Request Handling**: Handles HTTP requests and routes to appropriate
handlers
+- **Response Formatting**: Formats responses in appropriate formats (JSON,
Prometheus text)
+
+##### API Endpoints
+
+**`GET /metrics`**
+- Returns latest metrics from all agents (on-demand collection, not stored in
Proxy)
+- Includes node metadata (role, ID, labels)
+- Format: Prometheus text format
+- Query parameters:
+ - `node_id` (optional): Filter by node ID
+ - `role` (optional): Filter by role (liaison, datanode-hot, etc.)
+- Used by: Prometheus scrapers, monitoring systems
+
+**`GET /metrics-windows`**
+- Returns metrics from all agents (on-demand collection, not stored in Proxy)
+- Includes node metadata (role, ID, labels)
+- Format: Prometheus text format
+- Query parameters:
+ - `start_time`: Start time for time window (optional) - filters metrics by
start time (agents filter from Flight Recorder)
+ - `end_time`: End time for time window (optional) - filters metrics by end
time (agents filter from Flight Recorder)
+ - `node_id`: Filter by node ID (optional)
+ - `role`: Filter by node role (optional)
+
+**`GET /cluster`**
+- Returns cluster topology and node status
+- Format: JSON
+- Response includes:
+ - List of all registered nodes
+ - Node identity (ID, name, address)
+ - Node role
+ - Node labels
+ - Agent status (online/offline, last heartbeat)
+ - Node relationships
+- Note: Node information is obtained from agent registration data stored in
AgentRegistry
+
+**`GET /cluster/config`**
+- Returns node configurations
+- Format: JSON
+- Query parameters:
+ - `node_id`: Filter by node ID (optional)
+ - `role`: Filter by node role (optional)
+
+**`GET /health`**
+- Health check endpoint
+- Format: JSON
+- Returns proxy health status
+
+##### Core Types
+
+**`APIServer`**
+```go
+type APIServer struct {
+ metricsAggregator *MetricsAggregator
+ server *http.Server
+ logger *logger.Logger
+}
+```
+
+##### Key Functions
+
+**`Start(listenAddr string) error`**
+- Starts HTTP server
+- Registers all API endpoints
+- Returns error if start fails
+
+**`Stop() error`**
+- Gracefully stops HTTP server
+- Waits for in-flight requests
+- Returns error if stop fails
+
+##### Configuration Flags
+
+**`--http-listen-addr`**
+- **Type**: `string`
+- **Default**: `:17901`
+- **Description**: HTTP server listen address
+
+**`--http-read-timeout`**
+- **Type**: `duration`
+- **Default**: `10s`
+- **Description**: HTTP read timeout
+
+**`--http-write-timeout`**
+- **Type**: `duration`
+- **Default**: `10s`
+- **Description**: HTTP write timeout
+
+### 2. Agent Components
+
+**Purpose**: Components that run within FODC Agents to communicate with the
Proxy
+
+#### 2.1 Proxy Client Component
+
+**Purpose**: Manages connection and communication with the FODC Proxy
+
+##### Core Responsibilities
+
+- **Connection Management**: Establishes and maintains gRPC connection to Proxy
+- **Registration**: Registers agent with Proxy on startup
+- **Heartbeat Management**: Sends periodic heartbeats to maintain connection
+- **Stream Management**: Manages bi-directional streams for metrics
+- **Reconnection Logic**: Handles connection failures and automatic
reconnection
+
+##### Core Types
+
+**`ProxyClient`**
+```go
+type ProxyClient struct {
+ proxyAddr string
+ nodeIP string
+ nodePort int
+ nodeRole databasev1.Role
+ labels map[string]string
+ conn *grpc.ClientConn
+ client fodcv1.FODCServiceClient
+ heartbeatTicker *time.Ticker
+ mu sync.RWMutex
+ logger *logger.Logger
+}
+
+type MetricsRequestFilter struct {
+ StartTime *time.Time // Start time for time window (optional)
+ EndTime *time.Time // End time for time window (optional)
+}
+```
+
+**Note**: The ProxyClient integrates with Flight Recorder, which continuously
collects and stores metrics in-memory. When the Proxy requests metrics, the
agent retrieves them from Flight Recorder rather than collecting them on-demand.
+
+##### Key Functions
+
+**`Connect(ctx context.Context) error`**
+- Establishes gRPC connection to Proxy
+- Returns error if connection fails
+
+**`StartRegistrationStream(ctx context.Context) error`**
+- Establishes bi-directional registration stream with Proxy
+- Sends initial RegisterAgentRequest with primary address (IP + port), role,
and labels
+- Receives RegisterAgentResponse with heartbeat interval
+- Maintains stream for periodic heartbeat and re-registration
+- Returns error if stream establishment fails
+
+**`StartMetricsStream(ctx context.Context) error`**
+- Establishes bi-directional metrics stream with Proxy
+- Maintains stream for on-demand metrics requests
+- Listens for metrics request commands from Proxy
+- When request received, retrieves metrics from Flight Recorder and sends
response
+- Returns error if stream fails
+
+**`RetrieveAndSendMetrics(ctx context.Context, filter *MetricsRequestFilter)
error`**
+- Retrieves metrics from Flight Recorder when requested by Proxy
+- Queries Flight Recorder (in-memory storage) for metrics
+- Flight Recorder contains continuously collected metrics from:
+ * BanyanDB /metrics endpoint scraping
+ * KTM/OS telemetry collection
+ * Other observability sources
+- Filters metrics by time window if specified in request
+- Packages metrics in StreamMetricsRequest (includes metrics and timestamp)
+- Sends metrics to Proxy via StreamMetrics() stream
+- Returns error if retrieval or send fails
+
+**`SendHeartbeat(ctx context.Context) error`**
+- Sends heartbeat to Proxy
+- Updates connection status
+- Returns error if heartbeat fails
+
+**`Disconnect() error`**
+- Closes connection to Proxy
+- Cleans up resources
+- Returns error if disconnect fails
+
+##### Configuration Flags
+
+**`--proxy-addr`**
+- **Type**: `string`
+- **Default**: `localhost:17900`
+- **Description**: FODC Proxy gRPC address
+
+**`--node-ip`**
+- **Type**: `string`
+- **Default**: (required, no default)
+- **Description**: IP address for this BanyanDB node's primary gRPC address.
Used as part of AgentKey for registry identification.
+
+**`--node-port`**
+- **Type**: `int`
+- **Default**: (required, no default)
+- **Description**: Port number for this BanyanDB node's primary gRPC address.
Used as part of AgentKey for registry identification.
+
+**`--node-role`**
+- **Type**: `string`
+- **Default**: (required, no default)
+- **Description**: Role of this BanyanDB node. Valid values: `liaison`,
`datanode-hot`, `datanode-warm`, `datanode-cold`, etc. Must match the node's
actual role in the cluster.
+
+**`--node-labels`**
+- **Type**: `string` (comma-separated key=value pairs)
+- **Default**: (optional)
+- **Description**: Labels/metadata for this node. Format:
`key1=value1,key2=value2`. Examples: `zone=us-west-1,env=production`. Used for
filtering and grouping nodes in the Proxy.
+
+**`--heartbeat-interval`**
+- **Type**: `duration`
+- **Default**: `10s`
+- **Description**: Interval for sending heartbeats to Proxy. Note: The Proxy
may override this value in RegisterAgentResponse.
+
+**`--reconnect-interval`**
+- **Type**: `duration`
+- **Default**: `5s`
+- **Description**: Interval for reconnection attempts when connection to Proxy
is lost
+
+## API Design
+
+### gRPC API
+
+#### Service Definition
+
+```protobuf
+
+syntax = "proto3";
+
+package banyandb.fodc.v1;
+
+import "google/protobuf/timestamp.proto";
+
+option go_package =
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1";
+
+service FODCService {
+ // Bi-directional stream for agent registration and heartbeat
+ rpc RegisterAgent(stream RegisterAgentRequest) returns (stream
RegisterAgentResponse);
+
+ // Bi-directional stream for metrics
+ // Agent sends StreamMetricsRequest (metrics data), Proxy sends
StreamMetricsResponse (metrics requests)
+ rpc StreamMetrics(stream StreamMetricsRequest) returns (stream
StreamMetricsResponse);
+}
+
+message RegisterAgentRequest {
+ string node_role = 1;
+ map<string, string> labels = 2;
+ Address primary_address = 3;
+ map<string, Address> secondary_addresses = 4;
+}
+
+message Address {
+ string ip = 1;
+ int32 port = 2;
+}
+
+message RegisterAgentResponse {
+ bool success = 1;
+ string message = 2;
+ int64 heartbeat_interval_seconds = 3;
+}
+
+message StreamMetricsRequest {
+ repeated Metric metrics = 1;
+ google.protobuf.Timestamp timestamp = 2;
+}
+
+message Metric {
+ string name = 1;
+ map<string, string> labels = 2;
+ double value = 3;
+ string description = 4;
+}
+
+message StreamMetricsResponse {
+ google.protobuf.Timestamp start_time = 1; // Optional start time for time
window
+ google.protobuf.Timestamp end_time = 2; // Optional end time for time
window
+}
+```
+
+### REST API
+
+#### Endpoints
+
+**`GET /metrics`**
+- **Description**: Returns latest metrics from all agents (on-demand
collection, not stored in Proxy)
+- **Query Parameters**:
+ - `node_id` (optional): Filter by node ID
+ - `role` (optional): Filter by role (liaison, datanode-hot, etc.)
+- **Response**: Prometheus text format with node metadata labels
+- **Note**: Metrics are collected on-demand from agents' Flight Recorder. No
metrics are stored in the Proxy.
+- **Example**:
+```
+# HELP banyandb_stream_tst_inverted_index_total_doc_count Total document count
+# TYPE banyandb_stream_tst_inverted_index_total_doc_count gauge
+banyandb_stream_tst_inverted_index_total_doc_count{index="test",node_id="node1",node_role="datanode-hot"}
12345
+```
+
+**`GET /metrics-windows`**
+- **Description**: Returns metrics from all agents (on-demand collection, not
stored in Proxy)
+- **Query Parameters**:
+ - `start_time` (optional): RFC3339 timestamp - filters metrics by start time
(agents filter from Flight Recorder)
+ - `end_time` (optional): RFC3339 timestamp - filters metrics by end time
(agents filter from Flight Recorder)
+ - `node_id` (optional): Filter by node ID
+ - `role` (optional): Filter by role (liaison, datanode-hot, etc.)
+- **Response**: Prometheus text format with node metadata labels
+- **Note**: Metrics are collected on-demand from agents' Flight Recorder. No
metrics are stored in the Proxy.
+- **Example**:
+```
+# HELP banyandb_stream_tst_inverted_index_total_doc_count Total document count
+# TYPE banyandb_stream_tst_inverted_index_total_doc_count gauge
+banyandb_stream_tst_inverted_index_total_doc_count{index="test",node_id="node1",node_role="datanode-hot"}
12345
+```
+
+**`GET /cluster`**
+- **Description**: Returns cluster topology and node information
Review Comment:
NOTE, for the initial version, we only have the node list, no topology.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]