This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 27d4793db test(fodc): fix the flaky test and no longer depends on 
timing-sensitive manual injection for cluster topology (#1006)
27d4793db is described below

commit 27d4793dbfd0fe2055213e1e1c9e6dc7e2039499
Author: Fine0830 <[email protected]>
AuthorDate: Thu Mar 12 11:35:35 2026 +0800

    test(fodc): fix the flaky test and no longer depends on timing-sensitive 
manual injection for cluster topology (#1006)
---
 .../internal/integration/cluster_collector_test.go | 40 ++++++++++++----
 fodc/agent/testhelper/flightrecorder.go            | 51 ++++++++++++++++++--
 fodc/proxy/internal/cluster/manager_test.go        | 23 ++++++++-
 fodc/proxy/internal/grpc/service.go                | 18 +++++++
 .../cluster_topology_integration_test.go           | 55 ++++++++++------------
 5 files changed, 143 insertions(+), 44 deletions(-)

diff --git a/fodc/agent/internal/integration/cluster_collector_test.go 
b/fodc/agent/internal/integration/cluster_collector_test.go
index 94c1116a6..a301ee38b 100644
--- a/fodc/agent/internal/integration/cluster_collector_test.go
+++ b/fodc/agent/internal/integration/cluster_collector_test.go
@@ -89,12 +89,20 @@ var _ = Describe("Cluster Collector Integration", func() {
                        err := collector.Start(collectionCtx)
                        Expect(err).NotTo(HaveOccurred())
 
-                       // Wait a bit for cluster topology to be collected
-                       time.Sleep(2 * time.Second)
+                       waitCtx, waitCancel := 
context.WithTimeout(collectionCtx, 30*time.Second)
+                       defer waitCancel()
+
+                       err = collector.WaitForNodeFetched(waitCtx)
+                       Expect(err).NotTo(HaveOccurred())
+
+                       Eventually(func() int {
+                               topology := collector.GetClusterTopology()
+                               return len(topology.Nodes)
+                       }, 10*time.Second, 
200*time.Millisecond).Should(BeNumerically(">", 0), "Expected initial cluster 
topology collection to produce nodes")
 
                        topology := collector.GetClusterTopology()
                        Expect(topology).NotTo(BeNil())
-                       Expect(len(topology.Nodes)).To(BeNumerically(">=", 0))
+                       Expect(topology.Nodes).NotTo(BeEmpty())
                })
 
                It("should handle node role determination correctly", func() {
@@ -139,8 +147,8 @@ var _ = Describe("Cluster Collector Integration", func() {
                        Expect(nodes).To(BeEmpty(), "No nodes should be fetched 
with invalid address")
                })
 
-               It("should handle connection timeouts", func() {
-                       // Use a valid address format but non-existent server
+               It("should handle invalid port addresses gracefully", func() {
+                       // Use an invalid port value to verify address handling 
remains graceful.
                        collector = cluster.NewCollector(testLogger, 
[]string{"127.0.0.1:99999"}, 1*time.Second, "test-pod")
 
                        err := collector.Start(collectionCtx)
@@ -165,17 +173,31 @@ var _ = Describe("Cluster Collector Integration", func() {
                        err := collector.Start(collectionCtx)
                        Expect(err).NotTo(HaveOccurred())
 
-                       // Wait for initial collection
-                       time.Sleep(3 * time.Second)
+                       waitCtx, waitCancel := 
context.WithTimeout(collectionCtx, 30*time.Second)
+                       defer waitCancel()
+
+                       err = collector.WaitForNodeFetched(waitCtx)
+                       Expect(err).NotTo(HaveOccurred())
+
+                       Eventually(func() int {
+                               topology := collector.GetClusterTopology()
+                               return len(topology.Nodes)
+                       }, 10*time.Second, 
200*time.Millisecond).Should(BeNumerically(">", 0), "Expected initial topology 
collection to produce nodes")
 
                        initialTopology := collector.GetClusterTopology()
                        Expect(initialTopology).NotTo(BeNil())
+                       Expect(initialTopology.Nodes).NotTo(BeEmpty())
+                       initialNodeCount := len(initialTopology.Nodes)
+                       initialCallCount := len(initialTopology.Calls)
 
-                       // Wait for another collection cycle
-                       time.Sleep(3 * time.Second)
+                       Consistently(func() bool {
+                               topology := collector.GetClusterTopology()
+                               return len(topology.Nodes) >= initialNodeCount 
&& len(topology.Calls) >= initialCallCount
+                       }, 3*time.Second, 200*time.Millisecond).Should(BeTrue())
 
                        updatedTopology := collector.GetClusterTopology()
                        Expect(updatedTopology).NotTo(BeNil())
+                       Expect(updatedTopology.Nodes).NotTo(BeEmpty())
                        // The topology should be updated
                })
        })
diff --git a/fodc/agent/testhelper/flightrecorder.go 
b/fodc/agent/testhelper/flightrecorder.go
index 0983e4f6d..931f53e2a 100644
--- a/fodc/agent/testhelper/flightrecorder.go
+++ b/fodc/agent/testhelper/flightrecorder.go
@@ -23,6 +23,9 @@ import (
        "fmt"
        "time"
 
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       agentcluster 
"github.com/apache/skywalking-banyandb/fodc/agent/internal/cluster"
        
"github.com/apache/skywalking-banyandb/fodc/agent/internal/flightrecorder"
        agentmetrics 
"github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
        "github.com/apache/skywalking-banyandb/fodc/agent/internal/proxy"
@@ -128,6 +131,33 @@ func NewProxyClient(
        reconnectInterval time.Duration,
        flightRecorder interface{},
        logger *logger.Logger,
+) *proxy.Client {
+       return NewProxyClientWithCollector(
+               proxyAddr,
+               nodeRole,
+               podName,
+               containerNames,
+               labels,
+               heartbeatInterval,
+               reconnectInterval,
+               flightRecorder,
+               nil,
+               logger,
+       )
+}
+
+// NewProxyClientWithCollector creates a new ProxyClient instance for testing 
with optional cluster collector.
+func NewProxyClientWithCollector(
+       proxyAddr string,
+       nodeRole string,
+       podName string,
+       containerNames []string,
+       labels map[string]string,
+       heartbeatInterval time.Duration,
+       reconnectInterval time.Duration,
+       flightRecorder interface{},
+       collector *agentcluster.Collector,
+       logger *logger.Logger,
 ) *proxy.Client {
        frTyped, ok := flightRecorder.(*flightrecorder.FlightRecorder)
        if !ok {
@@ -143,7 +173,7 @@ func NewProxyClient(
                heartbeatInterval,
                reconnectInterval,
                frTyped,
-               nil,
+               collector,
                logger,
        )
 }
@@ -151,6 +181,7 @@ func NewProxyClient(
 // ProxyClientWrapper wraps ProxyClient methods for testing.
 type ProxyClientWrapper struct {
        client        *proxy.Client
+       collector     *agentcluster.Collector
        ctx           context.Context
        connMgrActive bool
 }
@@ -198,6 +229,18 @@ func (w *ProxyClientWrapper) Disconnect() error {
        return w.client.Disconnect()
 }
 
+// SetClusterTopology seeds the wrapped client's cluster collector for testing.
+func (w *ProxyClientWrapper) SetClusterTopology(nodes []*databasev1.Node, 
calls []*fodcv1.Call) error {
+       if w == nil || w.collector == nil {
+               return fmt.Errorf("cluster collector not available")
+       }
+       w.collector.SetClusterTopology(agentcluster.TopologyMap{
+               Nodes: nodes,
+               Calls: calls,
+       })
+       return nil
+}
+
 // NewProxyClientWrapper creates a wrapped ProxyClient for testing.
 func NewProxyClientWrapper(
        proxyAddr string,
@@ -210,7 +253,8 @@ func NewProxyClientWrapper(
        flightRecorder interface{},
        logger *logger.Logger,
 ) *ProxyClientWrapper {
-       client := NewProxyClient(
+       collector := &agentcluster.Collector{}
+       client := NewProxyClientWithCollector(
                proxyAddr,
                nodeRole,
                podName,
@@ -219,10 +263,11 @@ func NewProxyClientWrapper(
                heartbeatInterval,
                reconnectInterval,
                flightRecorder,
+               collector,
                logger,
        )
        if client == nil {
                return nil
        }
-       return &ProxyClientWrapper{client: client}
+       return &ProxyClientWrapper{client: client, collector: collector}
 }
diff --git a/fodc/proxy/internal/cluster/manager_test.go 
b/fodc/proxy/internal/cluster/manager_test.go
index ccd96f81e..4359d44d6 100644
--- a/fodc/proxy/internal/cluster/manager_test.go
+++ b/fodc/proxy/internal/cluster/manager_test.go
@@ -70,6 +70,15 @@ func (m *mockRequestSender) SetRequestError(agentID string, 
err error) {
        m.requestErrors[agentID] = err
 }
 
+func containsAgent(agents []string, target string) bool {
+       for _, agentID := range agents {
+               if agentID == target {
+                       return true
+               }
+       }
+       return false
+}
+
 func initTestLogger(t *testing.T) *logger.Logger {
        t.Helper()
        initErr := logger.Init(logger.Logging{Env: "dev", Level: "debug"})
@@ -196,10 +205,20 @@ func TestManager_CollectClusterTopology_MultipleAgents(t 
*testing.T) {
                done <- mgr.CollectClusterTopology(context.Background())
        }()
 
-       // Give it a moment to set up channels and request data
-       time.Sleep(50 * time.Millisecond)
+       // Wait for collection channels to be registered and requests to be 
sent.
+       require.Eventually(t, func() bool {
+               mgr.collectingMu.RLock()
+               defer mgr.collectingMu.RUnlock()
+               _, hasAgent1 := mgr.collecting[agentID1]
+               _, hasAgent2 := mgr.collecting[agentID2]
+               return hasAgent1 && hasAgent2
+       }, time.Second, 10*time.Millisecond)
 
        // Verify requests were made
+       require.Eventually(t, func() bool {
+               requestedAgents := mockSender.GetRequestedAgents()
+               return containsAgent(requestedAgents, agentID1) && 
containsAgent(requestedAgents, agentID2)
+       }, time.Second, 10*time.Millisecond)
        requestedAgents := mockSender.GetRequestedAgents()
        assert.Contains(t, requestedAgents, agentID1)
        assert.Contains(t, requestedAgents, agentID2)
diff --git a/fodc/proxy/internal/grpc/service.go 
b/fodc/proxy/internal/grpc/service.go
index f47c0e17f..04ea1c2e2 100644
--- a/fodc/proxy/internal/grpc/service.go
+++ b/fodc/proxy/internal/grpc/service.go
@@ -76,6 +76,13 @@ func (ac *agentConnection) setClusterStateStream(stream 
fodcv1.FODCService_Strea
        ac.clusterStateStream = stream
 }
 
+// hasClusterStateStream reports whether the cluster topology stream is 
established.
+func (ac *agentConnection) hasClusterStateStream() bool {
+       ac.mu.RLock()
+       defer ac.mu.RUnlock()
+       return ac.clusterStateStream != nil
+}
+
 // sendMetricsRequest sends a metrics request to the agent via the metrics 
stream.
 func (ac *agentConnection) sendMetricsRequest(resp 
*fodcv1.StreamMetricsResponse) error {
        ac.mu.RLock()
@@ -135,6 +142,17 @@ func NewFODCService(
        }
 }
 
+// HasClusterStateStream reports whether the given agent currently has a 
cluster topology stream.
+func (s *FODCService) HasClusterStateStream(agentID string) bool {
+       s.connectionsMu.RLock()
+       defer s.connectionsMu.RUnlock()
+       conn, exists := s.connections[agentID]
+       if !exists || conn == nil {
+               return false
+       }
+       return conn.hasClusterStateStream()
+}
+
 // RegisterAgent handles bi-directional agent registration stream.
 func (s *FODCService) RegisterAgent(stream 
fodcv1.FODCService_RegisterAgentServer) error {
        ctx, cancel := context.WithCancel(stream.Context())
diff --git 
a/fodc/proxy/internal/integration/cluster_topology_integration_test.go 
b/fodc/proxy/internal/integration/cluster_topology_integration_test.go
index 904265ad2..3bf6dd4eb 100644
--- a/fodc/proxy/internal/integration/cluster_topology_integration_test.go
+++ b/fodc/proxy/internal/integration/cluster_topology_integration_test.go
@@ -19,6 +19,7 @@ package integration_test
 
 import (
        "context"
+       "encoding/json"
        "fmt"
        "net"
        "net/http"
@@ -58,11 +59,13 @@ var _ = Describe("Cluster Topology Integration", func() {
                agentCancel1      context.CancelFunc
                agentCtx2         context.Context
                agentCancel2      context.CancelFunc
+               httpClient        *http.Client
                testLogger        *logger.Logger
        )
 
        BeforeEach(func() {
                testLogger = logger.GetLogger("test", "integration")
+               httpClient = &http.Client{Timeout: 500 * time.Millisecond}
 
                heartbeatTimeout := 5 * time.Second
                cleanupTimeout := 10 * time.Second
@@ -90,11 +93,11 @@ var _ = Describe("Cluster Topology Integration", func() {
                Expect(httpListenErr).NotTo(HaveOccurred())
                proxyHTTPAddr = httpListener.Addr().String()
                _ = httpListener.Close()
-               httpServer = api.NewServer(metricsAggregator, nil, 
agentRegistry, testLogger)
+               httpServer = api.NewServer(metricsAggregator, clusterManager, 
agentRegistry, testLogger)
                Expect(httpServer.Start(proxyHTTPAddr, 10*time.Second, 
10*time.Second)).To(Succeed())
 
                Eventually(func() error {
-                       resp, err := http.Get(fmt.Sprintf("http://%s/health";, 
proxyHTTPAddr))
+                       resp, err := 
httpClient.Get(fmt.Sprintf("http://%s/health";, proxyHTTPAddr))
                        if err != nil {
                                return err
                        }
@@ -205,34 +208,26 @@ var _ = Describe("Cluster Topology Integration", func() {
                        },
                }
 
-               // Start collection in a goroutine - this sets up channels
-               ctx := context.Background()
-               done := make(chan *cluster.TopologyMap)
-               channelsReady := make(chan struct{})
-               go func() {
-                       // Wait a bit to ensure channels are set up before we 
start waiting
-                       time.Sleep(100 * time.Millisecond)
-                       close(channelsReady)
-                       done <- clusterManager.CollectClusterTopology(ctx)
-               }()
-
-               // Wait for channels to be set up
-               <-channelsReady
-
-               // Update topology for both agents - this sends to collection 
channels
-               clusterManager.UpdateClusterTopology(agents[0].AgentID, 
testTopology1)
-               clusterManager.UpdateClusterTopology(agents[1].AgentID, 
testTopology2)
-
-               // Wait for collection to complete
-               var topology *cluster.TopologyMap
-               select {
-               case topology = <-done:
-               case <-time.After(5 * time.Second):
-                       Fail("Collection timed out")
-               }
-
-               Expect(topology).NotTo(BeNil())
-               Expect(len(topology.Nodes)).To(BeNumerically(">=", 2))
+               Eventually(func(g Gomega) {
+                       
g.Expect(grpcService.HasClusterStateStream(agents[0].AgentID)).To(BeTrue())
+                       
g.Expect(grpcService.HasClusterStateStream(agents[1].AgentID)).To(BeTrue())
+               }, "1s", "10ms").Should(Succeed())
+
+               Expect(proxyClient1.SetClusterTopology(testTopology1.Nodes, 
testTopology1.Calls)).To(Succeed())
+               Expect(proxyClient2.SetClusterTopology(testTopology2.Nodes, 
testTopology2.Calls)).To(Succeed())
+
+               var topology cluster.TopologyMap
+               Eventually(func(g Gomega) {
+                       resp, err := 
httpClient.Get(fmt.Sprintf("http://%s/cluster/topology";, proxyHTTPAddr))
+                       g.Expect(err).NotTo(HaveOccurred())
+                       defer resp.Body.Close()
+                       g.Expect(resp.StatusCode).To(Equal(http.StatusOK))
+
+                       var result cluster.TopologyMap
+                       
g.Expect(json.NewDecoder(resp.Body).Decode(&result)).To(Succeed())
+                       g.Expect(len(result.Nodes)).To(BeNumerically(">=", 2))
+                       topology = result
+               }, "5s", "100ms").Should(Succeed())
 
                // Check that both agents are present
                nodeNames := make(map[string]bool)

Reply via email to