This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new d3aca8487 Make FODC agent test more stable (#1033)
d3aca8487 is described below

commit d3aca8487d208b894871dde524d415ed2d3f1bab
Author: mrproliu <[email protected]>
AuthorDate: Mon Mar 30 19:57:26 2026 +0800

    Make FODC agent test more stable (#1033)
---
 fodc/proxy/internal/api/server_test.go   |  2 +-
 fodc/proxy/internal/lifecycle/manager.go | 81 ++++++++++++++++++++------------
 2 files changed, 51 insertions(+), 32 deletions(-)

diff --git a/fodc/proxy/internal/api/server_test.go 
b/fodc/proxy/internal/api/server_test.go
index e6182bb73..81ff419db 100644
--- a/fodc/proxy/internal/api/server_test.go
+++ b/fodc/proxy/internal/api/server_test.go
@@ -690,7 +690,7 @@ func (m *mockLifecycleDataRequester) 
RequestLifecycleData(agentID string) error
        }
        if data, exists := m.dataByAgent[agentID]; exists && m.lifecycleMgr != 
nil {
                podName := m.podByAgent[agentID]
-               go m.lifecycleMgr.UpdateLifecycle(agentID, podName, data)
+               m.lifecycleMgr.UpdateLifecycle(agentID, podName, data)
        }
        return nil
 }
diff --git a/fodc/proxy/internal/lifecycle/manager.go 
b/fodc/proxy/internal/lifecycle/manager.go
index 66cb9ce81..0c042e4d4 100644
--- a/fodc/proxy/internal/lifecycle/manager.go
+++ b/fodc/proxy/internal/lifecycle/manager.go
@@ -106,13 +106,31 @@ func (m *Manager) UpdateLifecycle(agentID, podName 
string, data *fodcv1.Lifecycl
        }
 }
 
+func (m *Manager) registerSession(agentID string, collectChs map[string]chan 
*agentLifecycleData) {
+       collectCh := make(chan *agentLifecycleData, 1)
+       collectChs[agentID] = collectCh
+       m.collectingMu.Lock()
+       m.collecting[agentID] = collectCh
+       m.collectingMu.Unlock()
+}
+
+func (m *Manager) unregisterSession(agentID string, collectChs map[string]chan 
*agentLifecycleData) {
+       m.collectingMu.Lock()
+       if ch, exists := m.collecting[agentID]; exists {
+               close(ch)
+               delete(m.collecting, agentID)
+       }
+       m.collectingMu.Unlock()
+       delete(collectChs, agentID)
+}
+
 // CollectLifecycle requests and collects lifecycle data from all agents.
 func (m *Manager) CollectLifecycle(ctx context.Context) *InspectionResult {
        m.collectingOp.Lock()
        defer m.collectingOp.Unlock()
 
-       if m.registry == nil {
-               m.log.Info().Msg("CollectLifecycle: registry is nil, returning 
empty")
+       if m.registry == nil || m.grpcService == nil {
+               m.log.Info().Msg("CollectLifecycle: registry or grpcService is 
nil, returning empty")
                return emptyResult()
        }
 
@@ -125,48 +143,47 @@ func (m *Manager) CollectLifecycle(ctx context.Context) 
*InspectionResult {
        m.log.Info().Int("agent_count", len(agents)).Msg("CollectLifecycle: 
starting collection")
 
        collectChs := make(map[string]chan *agentLifecycleData)
+       defer m.cleanupSessions(collectChs)
 
-       defer func() {
-               m.collectingMu.Lock()
-               for agentID, collectCh := range collectChs {
-                       close(collectCh)
-                       delete(m.collecting, agentID)
-               }
-               m.collectingMu.Unlock()
-       }()
+       requestedCount := m.requestAllAgents(ctx, agents, collectChs)
+       m.log.Info().Int("requested", requestedCount).Int("waiting_for", 
len(collectChs)).
+               Msg("CollectLifecycle: requests sent, waiting for responses")
+
+       allData := m.waitForResponses(ctx, collectChs)
+       m.log.Info().Int("responses_with_data", len(allData)).
+               Msg("CollectLifecycle: all responses collected, aggregating")
+
+       return m.buildInspectionResult(allData)
+}
 
+func (m *Manager) requestAllAgents(ctx context.Context, agents 
[]*registry.AgentInfo,
+       collectChs map[string]chan *agentLifecycleData,
+) int {
        requestedCount := 0
        for _, agentInfo := range agents {
                select {
                case <-ctx.Done():
                        m.log.Info().Msg("CollectLifecycle: context canceled 
during request phase")
-                       return emptyResult()
+                       return requestedCount
                default:
                }
-               if m.grpcService == nil {
-                       m.log.Info().Str("agent_id", 
agentInfo.AgentID).Msg("CollectLifecycle: grpcService is nil, skipping request")
-                       continue
-               }
+               m.registerSession(agentInfo.AgentID, collectChs)
                if err := 
m.grpcService.RequestLifecycleData(agentInfo.AgentID); err != nil {
                        m.log.Info().Err(err).
                                Str("agent_id", agentInfo.AgentID).
                                Msg("Agent does not support lifecycle stream, 
skipping")
+                       m.unregisterSession(agentInfo.AgentID, collectChs)
                        continue
                }
-               collectCh := make(chan *agentLifecycleData, 1)
-               collectChs[agentInfo.AgentID] = collectCh
-               m.collectingMu.Lock()
-               m.collecting[agentInfo.AgentID] = collectCh
-               m.collectingMu.Unlock()
                requestedCount++
        }
+       return requestedCount
+}
 
-       m.log.Info().Int("requested", requestedCount).Int("waiting_for", 
len(collectChs)).Msg("CollectLifecycle: requests sent, waiting for responses")
-
+func (m *Manager) waitForResponses(ctx context.Context, collectChs 
map[string]chan *agentLifecycleData) []*agentLifecycleData {
        allData := make([]*agentLifecycleData, 0, len(collectChs))
        var dataMu sync.Mutex
        var wg sync.WaitGroup
-
        for agentID, collectCh := range collectChs {
                wg.Add(1)
                go func(id string, ch chan *agentLifecycleData) {
@@ -175,9 +192,7 @@ func (m *Manager) CollectLifecycle(ctx context.Context) 
*InspectionResult {
                        defer agentCancel()
                        select {
                        case <-agentCtx.Done():
-                               m.log.Warn().
-                                       Str("agent_id", id).
-                                       Msg("Timeout waiting for lifecycle data 
from agent")
+                               m.log.Warn().Str("agent_id", id).Msg("Timeout 
waiting for lifecycle data from agent")
                        case data := <-ch:
                                if data != nil {
                                        m.log.Info().
@@ -189,17 +204,21 @@ func (m *Manager) CollectLifecycle(ctx context.Context) 
*InspectionResult {
                                        dataMu.Lock()
                                        allData = append(allData, data)
                                        dataMu.Unlock()
-                               } else {
-                                       m.log.Info().Str("agent_id", 
id).Msg("CollectLifecycle: received nil data from agent")
                                }
                        }
                }(agentID, collectCh)
        }
-
        wg.Wait()
-       m.log.Info().Int("responses_with_data", 
len(allData)).Msg("CollectLifecycle: all responses collected, aggregating")
+       return allData
+}
 
-       return m.buildInspectionResult(allData)
+func (m *Manager) cleanupSessions(collectChs map[string]chan 
*agentLifecycleData) {
+       m.collectingMu.Lock()
+       for agentID, collectCh := range collectChs {
+               close(collectCh)
+               delete(m.collecting, agentID)
+       }
+       m.collectingMu.Unlock()
 }
 
 func (m *Manager) buildInspectionResult(allData []*agentLifecycleData) 
*InspectionResult {

Reply via email to