This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new d3aca8487 Make FODC agent test more stable (#1033)
d3aca8487 is described below
commit d3aca8487d208b894871dde524d415ed2d3f1bab
Author: mrproliu <[email protected]>
AuthorDate: Mon Mar 30 19:57:26 2026 +0800
Make FODC agent test more stable (#1033)
---
fodc/proxy/internal/api/server_test.go | 2 +-
fodc/proxy/internal/lifecycle/manager.go | 81 ++++++++++++++++++++------------
2 files changed, 51 insertions(+), 32 deletions(-)
diff --git a/fodc/proxy/internal/api/server_test.go
b/fodc/proxy/internal/api/server_test.go
index e6182bb73..81ff419db 100644
--- a/fodc/proxy/internal/api/server_test.go
+++ b/fodc/proxy/internal/api/server_test.go
@@ -690,7 +690,7 @@ func (m *mockLifecycleDataRequester)
RequestLifecycleData(agentID string) error
}
if data, exists := m.dataByAgent[agentID]; exists && m.lifecycleMgr !=
nil {
podName := m.podByAgent[agentID]
- go m.lifecycleMgr.UpdateLifecycle(agentID, podName, data)
+ m.lifecycleMgr.UpdateLifecycle(agentID, podName, data)
}
return nil
}
diff --git a/fodc/proxy/internal/lifecycle/manager.go
b/fodc/proxy/internal/lifecycle/manager.go
index 66cb9ce81..0c042e4d4 100644
--- a/fodc/proxy/internal/lifecycle/manager.go
+++ b/fodc/proxy/internal/lifecycle/manager.go
@@ -106,13 +106,31 @@ func (m *Manager) UpdateLifecycle(agentID, podName
string, data *fodcv1.Lifecycl
}
}
+func (m *Manager) registerSession(agentID string, collectChs map[string]chan
*agentLifecycleData) {
+ collectCh := make(chan *agentLifecycleData, 1)
+ collectChs[agentID] = collectCh
+ m.collectingMu.Lock()
+ m.collecting[agentID] = collectCh
+ m.collectingMu.Unlock()
+}
+
+func (m *Manager) unregisterSession(agentID string, collectChs map[string]chan
*agentLifecycleData) {
+ m.collectingMu.Lock()
+ if ch, exists := m.collecting[agentID]; exists {
+ close(ch)
+ delete(m.collecting, agentID)
+ }
+ m.collectingMu.Unlock()
+ delete(collectChs, agentID)
+}
+
// CollectLifecycle requests and collects lifecycle data from all agents.
func (m *Manager) CollectLifecycle(ctx context.Context) *InspectionResult {
m.collectingOp.Lock()
defer m.collectingOp.Unlock()
- if m.registry == nil {
- m.log.Info().Msg("CollectLifecycle: registry is nil, returning
empty")
+ if m.registry == nil || m.grpcService == nil {
+ m.log.Info().Msg("CollectLifecycle: registry or grpcService is
nil, returning empty")
return emptyResult()
}
@@ -125,48 +143,47 @@ func (m *Manager) CollectLifecycle(ctx context.Context)
*InspectionResult {
m.log.Info().Int("agent_count", len(agents)).Msg("CollectLifecycle:
starting collection")
collectChs := make(map[string]chan *agentLifecycleData)
+ defer m.cleanupSessions(collectChs)
- defer func() {
- m.collectingMu.Lock()
- for agentID, collectCh := range collectChs {
- close(collectCh)
- delete(m.collecting, agentID)
- }
- m.collectingMu.Unlock()
- }()
+ requestedCount := m.requestAllAgents(ctx, agents, collectChs)
+ m.log.Info().Int("requested", requestedCount).Int("waiting_for",
len(collectChs)).
+ Msg("CollectLifecycle: requests sent, waiting for responses")
+
+ allData := m.waitForResponses(ctx, collectChs)
+ m.log.Info().Int("responses_with_data", len(allData)).
+ Msg("CollectLifecycle: all responses collected, aggregating")
+
+ return m.buildInspectionResult(allData)
+}
+func (m *Manager) requestAllAgents(ctx context.Context, agents
[]*registry.AgentInfo,
+ collectChs map[string]chan *agentLifecycleData,
+) int {
requestedCount := 0
for _, agentInfo := range agents {
select {
case <-ctx.Done():
m.log.Info().Msg("CollectLifecycle: context canceled
during request phase")
- return emptyResult()
+ return requestedCount
default:
}
- if m.grpcService == nil {
- m.log.Info().Str("agent_id",
agentInfo.AgentID).Msg("CollectLifecycle: grpcService is nil, skipping request")
- continue
- }
+ m.registerSession(agentInfo.AgentID, collectChs)
if err :=
m.grpcService.RequestLifecycleData(agentInfo.AgentID); err != nil {
m.log.Info().Err(err).
Str("agent_id", agentInfo.AgentID).
Msg("Agent does not support lifecycle stream,
skipping")
+ m.unregisterSession(agentInfo.AgentID, collectChs)
continue
}
- collectCh := make(chan *agentLifecycleData, 1)
- collectChs[agentInfo.AgentID] = collectCh
- m.collectingMu.Lock()
- m.collecting[agentInfo.AgentID] = collectCh
- m.collectingMu.Unlock()
requestedCount++
}
+ return requestedCount
+}
- m.log.Info().Int("requested", requestedCount).Int("waiting_for",
len(collectChs)).Msg("CollectLifecycle: requests sent, waiting for responses")
-
+func (m *Manager) waitForResponses(ctx context.Context, collectChs
map[string]chan *agentLifecycleData) []*agentLifecycleData {
allData := make([]*agentLifecycleData, 0, len(collectChs))
var dataMu sync.Mutex
var wg sync.WaitGroup
-
for agentID, collectCh := range collectChs {
wg.Add(1)
go func(id string, ch chan *agentLifecycleData) {
@@ -175,9 +192,7 @@ func (m *Manager) CollectLifecycle(ctx context.Context)
*InspectionResult {
defer agentCancel()
select {
case <-agentCtx.Done():
- m.log.Warn().
- Str("agent_id", id).
- Msg("Timeout waiting for lifecycle data
from agent")
+ m.log.Warn().Str("agent_id", id).Msg("Timeout
waiting for lifecycle data from agent")
case data := <-ch:
if data != nil {
m.log.Info().
@@ -189,17 +204,21 @@ func (m *Manager) CollectLifecycle(ctx context.Context)
*InspectionResult {
dataMu.Lock()
allData = append(allData, data)
dataMu.Unlock()
- } else {
- m.log.Info().Str("agent_id",
id).Msg("CollectLifecycle: received nil data from agent")
}
}
}(agentID, collectCh)
}
-
wg.Wait()
- m.log.Info().Int("responses_with_data",
len(allData)).Msg("CollectLifecycle: all responses collected, aggregating")
+ return allData
+}
- return m.buildInspectionResult(allData)
+func (m *Manager) cleanupSessions(collectChs map[string]chan
*agentLifecycleData) {
+ m.collectingMu.Lock()
+ for agentID, collectCh := range collectChs {
+ close(collectCh)
+ delete(m.collecting, agentID)
+ }
+ m.collectingMu.Unlock()
}
func (m *Manager) buildInspectionResult(allData []*agentLifecycleData)
*InspectionResult {