hanahmily commented on code in PR #963: URL: https://github.com/apache/skywalking-banyandb/pull/963#discussion_r2754763943
########## fodc/proxy/internal/cluster/manager.go: ########## @@ -0,0 +1,301 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package cluster provides cluster state management for FODC proxy. +package cluster + +import ( + "context" + "fmt" + "sync" + "time" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" + "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +const ( + defaultCollectionTimeout = 10 * time.Second +) + +// NodeWithStringRoles represents a node with roles as strings instead of numeric enum values. +type NodeWithStringRoles struct { + *databasev1.Node + + Status *registry.AgentStatus `json:"status,omitempty"` + LastHeartbeat *time.Time `json:"last_heartbeat,omitempty"` + Roles []string `json:"roles"` +} + +// TopologyMap represents processed cluster data with string roles. +type TopologyMap struct { + Nodes []*NodeWithStringRoles `json:"nodes"` + Calls []*fodcv1.Call `json:"calls"` +} + +// RequestSender is an interface for sending cluster data requests to agents. +type RequestSender interface { + RequestClusterData(agentID string) error +} + +// Manager manages cluster state from multiple agents. +type Manager struct { + log *logger.Logger + registry *registry.AgentRegistry + grpcService RequestSender + collecting map[string]chan *TopologyMap + mu sync.RWMutex + collectingMu sync.RWMutex +} + +// NewManager creates a new cluster state manager. +func NewManager(registry *registry.AgentRegistry, grpcService RequestSender, log *logger.Logger) *Manager { + return &Manager{ + registry: registry, + grpcService: grpcService, + log: log, + collecting: make(map[string]chan *TopologyMap), + } +} + +// SetGRPCService sets the gRPC service for sending cluster data requests. +func (m *Manager) SetGRPCService(grpcService RequestSender) { + m.mu.Lock() + defer m.mu.Unlock() + m.grpcService = grpcService +} + +// UpdateClusterTopology updates cluster topology for a specific agent. +func (m *Manager) UpdateClusterTopology(agentID string, topology *fodcv1.Topology) { + topologyMap := convertTopologyToMap(topology) + m.collectingMu.RLock() + collectCh, exists := m.collecting[agentID] + m.collectingMu.RUnlock() + if exists { + select { + case collectCh <- topologyMap: + default: + m.log.Warn().Str("agent_id", agentID).Msg("Topology collection channel full, dropping topology") + } + } +} + +// RemoveTopology removes cluster topology for a specific agent. +func (m *Manager) RemoveTopology(agentID string) { + m.collectingMu.Lock() + defer m.collectingMu.Unlock() + if collectCh, exists := m.collecting[agentID]; exists { + close(collectCh) + delete(m.collecting, agentID) + } +} + +// CollectClusterTopology requests and collects cluster topology from all agents with context. +func (m *Manager) CollectClusterTopology(ctx context.Context) *TopologyMap { + if m.registry == nil { + return &TopologyMap{ + Nodes: make([]*NodeWithStringRoles, 0), + Calls: make([]*fodcv1.Call, 0), + } + } + agents := m.registry.ListAgents() + if len(agents) == 0 { + return &TopologyMap{ + Nodes: make([]*NodeWithStringRoles, 0), + Calls: make([]*fodcv1.Call, 0), + } + } + collectChs := make(map[string]chan *TopologyMap) + agentIDs := make([]string, 0, len(agents)) + m.collectingMu.Lock() + for _, agentInfo := range agents { + collectCh := make(chan *TopologyMap, 1) + collectChs[agentInfo.AgentID] = collectCh + m.collecting[agentInfo.AgentID] = collectCh + agentIDs = append(agentIDs, agentInfo.AgentID) + } + m.collectingMu.Unlock() + defer func() { + m.collectingMu.Lock() + for _, agentID := range agentIDs { + if collectCh, exists := m.collecting[agentID]; exists { + close(collectCh) Review Comment: If two overlapping GET calls both do, the second call overwrites the first’s channel for that agent. Extends the lock's scope to the function level to ensure no concurrent operations occur. ########## fodc/agent/internal/cluster/collector.go: ########## @@ -0,0 +1,474 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package cluster provides cluster state collection functionality for FODC agent. +package cluster + +import ( + "context" + "fmt" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +const ( + maxRetries = 3 + initialBackoff = 100 * time.Millisecond + maxBackoff = 5 * time.Second + nodeInfoFetchTimeout = 30 * time.Second +) + +// TopologyMap represents processed cluster data for a single endpoint. +type TopologyMap struct { + Nodes []*databasev1.Node `json:"nodes"` + Calls []*fodcv1.Call `json:"calls"` +} + +// endpointClient holds gRPC connection and clients for a single endpoint. +type endpointClient struct { + conn *grpc.ClientConn + clusterClient databasev1.ClusterStateServiceClient + nodeQueryClient databasev1.NodeQueryServiceClient + addr string +} + +// Collector collects cluster state from BanyanDB nodes via gRPC and stores the data. +type Collector struct { + log *logger.Logger + closer *run.Closer + clients []*endpointClient + currentNodes map[string]*databasev1.Node + clusterTopology TopologyMap + nodeFetchedCh chan struct{} + podName string + addrs []string + interval time.Duration + mu sync.RWMutex +} + +// NewCollector creates a new cluster state collector. +func NewCollector(log *logger.Logger, addrs []string, interval time.Duration, podName string) *Collector { + return &Collector{ + log: log, + addrs: addrs, + interval: interval, + podName: podName, + closer: run.NewCloser(0), + nodeFetchedCh: make(chan struct{}), + clients: make([]*endpointClient, 0, len(addrs)), + currentNodes: make(map[string]*databasev1.Node), + clusterTopology: TopologyMap{}, + } +} + +// Start starts the cluster state collector. +func (c *Collector) Start(ctx context.Context) error { + if c.closer.Closed() { + return fmt.Errorf("cluster state collector has been stopped and cannot be restarted") + } + if !c.closer.AddRunning() { + return fmt.Errorf("cluster state collector is already started and cannot be started again") + } + for _, addr := range c.addrs { + conn, connErr := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if connErr != nil { + c.closeAllConnections() + c.closer.Done() + return fmt.Errorf("failed to create gRPC connection to %s: %w", addr, connErr) + } + client := &endpointClient{ + conn: conn, + clusterClient: databasev1.NewClusterStateServiceClient(conn), + nodeQueryClient: databasev1.NewNodeQueryServiceClient(conn), + addr: addr, + } + c.clients = append(c.clients, client) + } + go c.collectLoop(ctx) + c.log.Info().Strs("addrs", c.addrs).Dur("interval", c.interval).Msg("Cluster state collector started") + return nil +} + +func (c *Collector) closeAllConnections() { + for _, client := range c.clients { + if client.conn != nil { + if closeErr := client.conn.Close(); closeErr != nil { + c.log.Warn().Str("addr", client.addr).Err(closeErr).Msg("Error closing gRPC connection") + } + } + } + c.clients = nil +} + +// Stop stops the cluster state collector. +func (c *Collector) Stop() { + c.closer.CloseThenWait() + c.mu.Lock() + c.closeAllConnections() + c.mu.Unlock() + c.log.Info().Msg("Cluster state collector stopped") +} + +// GetCurrentNodes returns all current node info, keyed by endpoint address. +func (c *Collector) GetCurrentNodes() map[string]*databasev1.Node { + c.mu.RLock() + defer c.mu.RUnlock() + result := make(map[string]*databasev1.Node, len(c.currentNodes)) + for addr, node := range c.currentNodes { + result[addr] = node + } + return result +} + +// GetClusterTopology returns the aggregated cluster topology across all endpoints. +func (c *Collector) GetClusterTopology() TopologyMap { + c.mu.RLock() + defer c.mu.RUnlock() + return c.clusterTopology +} + +// SetClusterTopology sets the cluster topology data, primarily for tests. +func (c *Collector) SetClusterTopology(topology TopologyMap) { + c.mu.Lock() + c.clusterTopology = topology + c.mu.Unlock() +} + +// GetNodeInfo returns the processed node role and labels from the first available current node. +func (c *Collector) GetNodeInfo() (nodeRole string, nodeLabels map[string]string) { + c.mu.RLock() + defer c.mu.RUnlock() + for _, node := range c.currentNodes { + if node != nil { + return NodeRoleFromNode(node), node.Labels + } + } + return NodeRoleFromNode(nil), nil +} + +// WaitForNodeFetched waits until the current node info has been fetched. +func (c *Collector) WaitForNodeFetched(ctx context.Context) error { + select { + case <-c.nodeFetchedCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (c *Collector) collectLoop(ctx context.Context) { + defer c.closer.Done() + c.pollCurrentNode(ctx) + close(c.nodeFetchedCh) + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + c.collectClusterState(ctx) + for { + select { + case <-c.closer.CloseNotify(): + return + case <-ticker.C: + c.collectClusterState(ctx) + } + } +} + +func (c *Collector) pollCurrentNode(ctx context.Context) { + nodes := c.fetchCurrentNodes(ctx) + if len(nodes) == 0 { + c.log.Error().Msg("Failed to fetch current node info from any endpoint") + return + } + c.updateCurrentNodes(nodes) +} + +func (c *Collector) updateCurrentNodes(nodes map[string]*databasev1.Node) { + if len(nodes) == 0 { + c.log.Warn().Msg("Received empty current nodes map") + return + } + c.mu.Lock() + c.currentNodes = nodes + c.mu.Unlock() + c.log.Info().Int("nodes_count", len(nodes)).Msg("Updated current nodes from all endpoints") +} + +func (c *Collector) updateClusterStates(states map[string]*databasev1.GetClusterStateResponse) { + if len(states) == 0 { + c.log.Warn().Msg("Received empty cluster states map") + return + } + c.mu.Lock() + currentNodesCopy := make(map[string]*databasev1.Node, len(c.currentNodes)) + for addr, node := range c.currentNodes { + currentNodesCopy[addr] = node Review Comment: Use "proto.Clone" to make a deep copy of the node. ########## fodc/agent/internal/cluster/collector.go: ########## @@ -0,0 +1,474 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package cluster provides cluster state collection functionality for FODC agent. +package cluster + +import ( + "context" + "fmt" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +const ( + maxRetries = 3 + initialBackoff = 100 * time.Millisecond + maxBackoff = 5 * time.Second + nodeInfoFetchTimeout = 30 * time.Second +) + +// TopologyMap represents processed cluster data for a single endpoint. +type TopologyMap struct { + Nodes []*databasev1.Node `json:"nodes"` + Calls []*fodcv1.Call `json:"calls"` +} + +// endpointClient holds gRPC connection and clients for a single endpoint. +type endpointClient struct { + conn *grpc.ClientConn + clusterClient databasev1.ClusterStateServiceClient + nodeQueryClient databasev1.NodeQueryServiceClient + addr string +} + +// Collector collects cluster state from BanyanDB nodes via gRPC and stores the data. +type Collector struct { + log *logger.Logger + closer *run.Closer + clients []*endpointClient + currentNodes map[string]*databasev1.Node + clusterTopology TopologyMap + nodeFetchedCh chan struct{} + podName string + addrs []string + interval time.Duration + mu sync.RWMutex +} + +// NewCollector creates a new cluster state collector. +func NewCollector(log *logger.Logger, addrs []string, interval time.Duration, podName string) *Collector { + return &Collector{ + log: log, + addrs: addrs, + interval: interval, + podName: podName, + closer: run.NewCloser(0), + nodeFetchedCh: make(chan struct{}), + clients: make([]*endpointClient, 0, len(addrs)), + currentNodes: make(map[string]*databasev1.Node), + clusterTopology: TopologyMap{}, + } +} + +// Start starts the cluster state collector. +func (c *Collector) Start(ctx context.Context) error { + if c.closer.Closed() { + return fmt.Errorf("cluster state collector has been stopped and cannot be restarted") + } + if !c.closer.AddRunning() { + return fmt.Errorf("cluster state collector is already started and cannot be started again") + } + for _, addr := range c.addrs { + conn, connErr := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if connErr != nil { + c.closeAllConnections() + c.closer.Done() + return fmt.Errorf("failed to create gRPC connection to %s: %w", addr, connErr) + } + client := &endpointClient{ + conn: conn, + clusterClient: databasev1.NewClusterStateServiceClient(conn), + nodeQueryClient: databasev1.NewNodeQueryServiceClient(conn), + addr: addr, + } + c.clients = append(c.clients, client) + } + go c.collectLoop(ctx) + c.log.Info().Strs("addrs", c.addrs).Dur("interval", c.interval).Msg("Cluster state collector started") + return nil +} + +func (c *Collector) closeAllConnections() { + for _, client := range c.clients { + if client.conn != nil { + if closeErr := client.conn.Close(); closeErr != nil { + c.log.Warn().Str("addr", client.addr).Err(closeErr).Msg("Error closing gRPC connection") + } + } + } + c.clients = nil +} + +// Stop stops the cluster state collector. +func (c *Collector) Stop() { + c.closer.CloseThenWait() + c.mu.Lock() + c.closeAllConnections() + c.mu.Unlock() + c.log.Info().Msg("Cluster state collector stopped") +} + +// GetCurrentNodes returns all current node info, keyed by endpoint address. +func (c *Collector) GetCurrentNodes() map[string]*databasev1.Node { + c.mu.RLock() + defer c.mu.RUnlock() + result := make(map[string]*databasev1.Node, len(c.currentNodes)) + for addr, node := range c.currentNodes { + result[addr] = node + } + return result +} + +// GetClusterTopology returns the aggregated cluster topology across all endpoints. +func (c *Collector) GetClusterTopology() TopologyMap { + c.mu.RLock() + defer c.mu.RUnlock() + return c.clusterTopology Review Comment: Return a deep copy of the map. Use proto.Clone to copy the protobuf message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
