This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 27d4793db test(fodc): fix the flaky test and no longer depends on
timing-sensitive manual injection for cluster topology (#1006)
27d4793db is described below
commit 27d4793dbfd0fe2055213e1e1c9e6dc7e2039499
Author: Fine0830 <[email protected]>
AuthorDate: Thu Mar 12 11:35:35 2026 +0800
test(fodc): fix the flaky test and no longer depends on timing-sensitive
manual injection for cluster topology (#1006)
---
.../internal/integration/cluster_collector_test.go | 40 ++++++++++++----
fodc/agent/testhelper/flightrecorder.go | 51 ++++++++++++++++++--
fodc/proxy/internal/cluster/manager_test.go | 23 ++++++++-
fodc/proxy/internal/grpc/service.go | 18 +++++++
.../cluster_topology_integration_test.go | 55 ++++++++++------------
5 files changed, 143 insertions(+), 44 deletions(-)
diff --git a/fodc/agent/internal/integration/cluster_collector_test.go
b/fodc/agent/internal/integration/cluster_collector_test.go
index 94c1116a6..a301ee38b 100644
--- a/fodc/agent/internal/integration/cluster_collector_test.go
+++ b/fodc/agent/internal/integration/cluster_collector_test.go
@@ -89,12 +89,20 @@ var _ = Describe("Cluster Collector Integration", func() {
err := collector.Start(collectionCtx)
Expect(err).NotTo(HaveOccurred())
- // Wait a bit for cluster topology to be collected
- time.Sleep(2 * time.Second)
+ waitCtx, waitCancel :=
context.WithTimeout(collectionCtx, 30*time.Second)
+ defer waitCancel()
+
+ err = collector.WaitForNodeFetched(waitCtx)
+ Expect(err).NotTo(HaveOccurred())
+
+ Eventually(func() int {
+ topology := collector.GetClusterTopology()
+ return len(topology.Nodes)
+ }, 10*time.Second,
200*time.Millisecond).Should(BeNumerically(">", 0), "Expected initial cluster
topology collection to produce nodes")
topology := collector.GetClusterTopology()
Expect(topology).NotTo(BeNil())
- Expect(len(topology.Nodes)).To(BeNumerically(">=", 0))
+ Expect(topology.Nodes).NotTo(BeEmpty())
})
It("should handle node role determination correctly", func() {
@@ -139,8 +147,8 @@ var _ = Describe("Cluster Collector Integration", func() {
Expect(nodes).To(BeEmpty(), "No nodes should be fetched
with invalid address")
})
- It("should handle connection timeouts", func() {
- // Use a valid address format but non-existent server
+ It("should handle invalid port addresses gracefully", func() {
+ // Use an invalid port value to verify address handling
remains graceful.
collector = cluster.NewCollector(testLogger,
[]string{"127.0.0.1:99999"}, 1*time.Second, "test-pod")
err := collector.Start(collectionCtx)
@@ -165,17 +173,31 @@ var _ = Describe("Cluster Collector Integration", func() {
err := collector.Start(collectionCtx)
Expect(err).NotTo(HaveOccurred())
- // Wait for initial collection
- time.Sleep(3 * time.Second)
+ waitCtx, waitCancel :=
context.WithTimeout(collectionCtx, 30*time.Second)
+ defer waitCancel()
+
+ err = collector.WaitForNodeFetched(waitCtx)
+ Expect(err).NotTo(HaveOccurred())
+
+ Eventually(func() int {
+ topology := collector.GetClusterTopology()
+ return len(topology.Nodes)
+ }, 10*time.Second,
200*time.Millisecond).Should(BeNumerically(">", 0), "Expected initial topology
collection to produce nodes")
initialTopology := collector.GetClusterTopology()
Expect(initialTopology).NotTo(BeNil())
+ Expect(initialTopology.Nodes).NotTo(BeEmpty())
+ initialNodeCount := len(initialTopology.Nodes)
+ initialCallCount := len(initialTopology.Calls)
- // Wait for another collection cycle
- time.Sleep(3 * time.Second)
+ Consistently(func() bool {
+ topology := collector.GetClusterTopology()
+ return len(topology.Nodes) >= initialNodeCount
&& len(topology.Calls) >= initialCallCount
+ }, 3*time.Second, 200*time.Millisecond).Should(BeTrue())
updatedTopology := collector.GetClusterTopology()
Expect(updatedTopology).NotTo(BeNil())
+ Expect(updatedTopology.Nodes).NotTo(BeEmpty())
// The topology should be updated
})
})
diff --git a/fodc/agent/testhelper/flightrecorder.go
b/fodc/agent/testhelper/flightrecorder.go
index 0983e4f6d..931f53e2a 100644
--- a/fodc/agent/testhelper/flightrecorder.go
+++ b/fodc/agent/testhelper/flightrecorder.go
@@ -23,6 +23,9 @@ import (
"fmt"
"time"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ fodcv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+ agentcluster
"github.com/apache/skywalking-banyandb/fodc/agent/internal/cluster"
"github.com/apache/skywalking-banyandb/fodc/agent/internal/flightrecorder"
agentmetrics
"github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
"github.com/apache/skywalking-banyandb/fodc/agent/internal/proxy"
@@ -128,6 +131,33 @@ func NewProxyClient(
reconnectInterval time.Duration,
flightRecorder interface{},
logger *logger.Logger,
+) *proxy.Client {
+ return NewProxyClientWithCollector(
+ proxyAddr,
+ nodeRole,
+ podName,
+ containerNames,
+ labels,
+ heartbeatInterval,
+ reconnectInterval,
+ flightRecorder,
+ nil,
+ logger,
+ )
+}
+
+// NewProxyClientWithCollector creates a new ProxyClient instance for testing
with optional cluster collector.
+func NewProxyClientWithCollector(
+ proxyAddr string,
+ nodeRole string,
+ podName string,
+ containerNames []string,
+ labels map[string]string,
+ heartbeatInterval time.Duration,
+ reconnectInterval time.Duration,
+ flightRecorder interface{},
+ collector *agentcluster.Collector,
+ logger *logger.Logger,
) *proxy.Client {
frTyped, ok := flightRecorder.(*flightrecorder.FlightRecorder)
if !ok {
@@ -143,7 +173,7 @@ func NewProxyClient(
heartbeatInterval,
reconnectInterval,
frTyped,
- nil,
+ collector,
logger,
)
}
@@ -151,6 +181,7 @@ func NewProxyClient(
// ProxyClientWrapper wraps ProxyClient methods for testing.
type ProxyClientWrapper struct {
client *proxy.Client
+ collector *agentcluster.Collector
ctx context.Context
connMgrActive bool
}
@@ -198,6 +229,18 @@ func (w *ProxyClientWrapper) Disconnect() error {
return w.client.Disconnect()
}
+// SetClusterTopology seeds the wrapped client's cluster collector for testing.
+func (w *ProxyClientWrapper) SetClusterTopology(nodes []*databasev1.Node,
calls []*fodcv1.Call) error {
+ if w == nil || w.collector == nil {
+ return fmt.Errorf("cluster collector not available")
+ }
+ w.collector.SetClusterTopology(agentcluster.TopologyMap{
+ Nodes: nodes,
+ Calls: calls,
+ })
+ return nil
+}
+
// NewProxyClientWrapper creates a wrapped ProxyClient for testing.
func NewProxyClientWrapper(
proxyAddr string,
@@ -210,7 +253,8 @@ func NewProxyClientWrapper(
flightRecorder interface{},
logger *logger.Logger,
) *ProxyClientWrapper {
- client := NewProxyClient(
+ collector := &agentcluster.Collector{}
+ client := NewProxyClientWithCollector(
proxyAddr,
nodeRole,
podName,
@@ -219,10 +263,11 @@ func NewProxyClientWrapper(
heartbeatInterval,
reconnectInterval,
flightRecorder,
+ collector,
logger,
)
if client == nil {
return nil
}
- return &ProxyClientWrapper{client: client}
+ return &ProxyClientWrapper{client: client, collector: collector}
}
diff --git a/fodc/proxy/internal/cluster/manager_test.go
b/fodc/proxy/internal/cluster/manager_test.go
index ccd96f81e..4359d44d6 100644
--- a/fodc/proxy/internal/cluster/manager_test.go
+++ b/fodc/proxy/internal/cluster/manager_test.go
@@ -70,6 +70,15 @@ func (m *mockRequestSender) SetRequestError(agentID string,
err error) {
m.requestErrors[agentID] = err
}
+func containsAgent(agents []string, target string) bool {
+ for _, agentID := range agents {
+ if agentID == target {
+ return true
+ }
+ }
+ return false
+}
+
func initTestLogger(t *testing.T) *logger.Logger {
t.Helper()
initErr := logger.Init(logger.Logging{Env: "dev", Level: "debug"})
@@ -196,10 +205,20 @@ func TestManager_CollectClusterTopology_MultipleAgents(t
*testing.T) {
done <- mgr.CollectClusterTopology(context.Background())
}()
- // Give it a moment to set up channels and request data
- time.Sleep(50 * time.Millisecond)
+ // Wait for collection channels to be registered and requests to be
sent.
+ require.Eventually(t, func() bool {
+ mgr.collectingMu.RLock()
+ defer mgr.collectingMu.RUnlock()
+ _, hasAgent1 := mgr.collecting[agentID1]
+ _, hasAgent2 := mgr.collecting[agentID2]
+ return hasAgent1 && hasAgent2
+ }, time.Second, 10*time.Millisecond)
// Verify requests were made
+ require.Eventually(t, func() bool {
+ requestedAgents := mockSender.GetRequestedAgents()
+ return containsAgent(requestedAgents, agentID1) &&
containsAgent(requestedAgents, agentID2)
+ }, time.Second, 10*time.Millisecond)
requestedAgents := mockSender.GetRequestedAgents()
assert.Contains(t, requestedAgents, agentID1)
assert.Contains(t, requestedAgents, agentID2)
diff --git a/fodc/proxy/internal/grpc/service.go
b/fodc/proxy/internal/grpc/service.go
index f47c0e17f..04ea1c2e2 100644
--- a/fodc/proxy/internal/grpc/service.go
+++ b/fodc/proxy/internal/grpc/service.go
@@ -76,6 +76,13 @@ func (ac *agentConnection) setClusterStateStream(stream
fodcv1.FODCService_Strea
ac.clusterStateStream = stream
}
+// hasClusterStateStream reports whether the cluster topology stream is
established.
+func (ac *agentConnection) hasClusterStateStream() bool {
+ ac.mu.RLock()
+ defer ac.mu.RUnlock()
+ return ac.clusterStateStream != nil
+}
+
// sendMetricsRequest sends a metrics request to the agent via the metrics
stream.
func (ac *agentConnection) sendMetricsRequest(resp
*fodcv1.StreamMetricsResponse) error {
ac.mu.RLock()
@@ -135,6 +142,17 @@ func NewFODCService(
}
}
+// HasClusterStateStream reports whether the given agent currently has a
cluster topology stream.
+func (s *FODCService) HasClusterStateStream(agentID string) bool {
+ s.connectionsMu.RLock()
+ defer s.connectionsMu.RUnlock()
+ conn, exists := s.connections[agentID]
+ if !exists || conn == nil {
+ return false
+ }
+ return conn.hasClusterStateStream()
+}
+
// RegisterAgent handles bi-directional agent registration stream.
func (s *FODCService) RegisterAgent(stream
fodcv1.FODCService_RegisterAgentServer) error {
ctx, cancel := context.WithCancel(stream.Context())
diff --git
a/fodc/proxy/internal/integration/cluster_topology_integration_test.go
b/fodc/proxy/internal/integration/cluster_topology_integration_test.go
index 904265ad2..3bf6dd4eb 100644
--- a/fodc/proxy/internal/integration/cluster_topology_integration_test.go
+++ b/fodc/proxy/internal/integration/cluster_topology_integration_test.go
@@ -19,6 +19,7 @@ package integration_test
import (
"context"
+ "encoding/json"
"fmt"
"net"
"net/http"
@@ -58,11 +59,13 @@ var _ = Describe("Cluster Topology Integration", func() {
agentCancel1 context.CancelFunc
agentCtx2 context.Context
agentCancel2 context.CancelFunc
+ httpClient *http.Client
testLogger *logger.Logger
)
BeforeEach(func() {
testLogger = logger.GetLogger("test", "integration")
+ httpClient = &http.Client{Timeout: 500 * time.Millisecond}
heartbeatTimeout := 5 * time.Second
cleanupTimeout := 10 * time.Second
@@ -90,11 +93,11 @@ var _ = Describe("Cluster Topology Integration", func() {
Expect(httpListenErr).NotTo(HaveOccurred())
proxyHTTPAddr = httpListener.Addr().String()
_ = httpListener.Close()
- httpServer = api.NewServer(metricsAggregator, nil,
agentRegistry, testLogger)
+ httpServer = api.NewServer(metricsAggregator, clusterManager,
agentRegistry, testLogger)
Expect(httpServer.Start(proxyHTTPAddr, 10*time.Second,
10*time.Second)).To(Succeed())
Eventually(func() error {
- resp, err := http.Get(fmt.Sprintf("http://%s/health",
proxyHTTPAddr))
+ resp, err :=
httpClient.Get(fmt.Sprintf("http://%s/health", proxyHTTPAddr))
if err != nil {
return err
}
@@ -205,34 +208,26 @@ var _ = Describe("Cluster Topology Integration", func() {
},
}
- // Start collection in a goroutine - this sets up channels
- ctx := context.Background()
- done := make(chan *cluster.TopologyMap)
- channelsReady := make(chan struct{})
- go func() {
- // Wait a bit to ensure channels are set up before we
start waiting
- time.Sleep(100 * time.Millisecond)
- close(channelsReady)
- done <- clusterManager.CollectClusterTopology(ctx)
- }()
-
- // Wait for channels to be set up
- <-channelsReady
-
- // Update topology for both agents - this sends to collection
channels
- clusterManager.UpdateClusterTopology(agents[0].AgentID,
testTopology1)
- clusterManager.UpdateClusterTopology(agents[1].AgentID,
testTopology2)
-
- // Wait for collection to complete
- var topology *cluster.TopologyMap
- select {
- case topology = <-done:
- case <-time.After(5 * time.Second):
- Fail("Collection timed out")
- }
-
- Expect(topology).NotTo(BeNil())
- Expect(len(topology.Nodes)).To(BeNumerically(">=", 2))
+ Eventually(func(g Gomega) {
+
g.Expect(grpcService.HasClusterStateStream(agents[0].AgentID)).To(BeTrue())
+
g.Expect(grpcService.HasClusterStateStream(agents[1].AgentID)).To(BeTrue())
+ }, "1s", "10ms").Should(Succeed())
+
+ Expect(proxyClient1.SetClusterTopology(testTopology1.Nodes,
testTopology1.Calls)).To(Succeed())
+ Expect(proxyClient2.SetClusterTopology(testTopology2.Nodes,
testTopology2.Calls)).To(Succeed())
+
+ var topology cluster.TopologyMap
+ Eventually(func(g Gomega) {
+ resp, err :=
httpClient.Get(fmt.Sprintf("http://%s/cluster/topology", proxyHTTPAddr))
+ g.Expect(err).NotTo(HaveOccurred())
+ defer resp.Body.Close()
+ g.Expect(resp.StatusCode).To(Equal(http.StatusOK))
+
+ var result cluster.TopologyMap
+
g.Expect(json.NewDecoder(resp.Body).Decode(&result)).To(Succeed())
+ g.Expect(len(result.Nodes)).To(BeNumerically(">=", 2))
+ topology = result
+ }, "5s", "100ms").Should(Succeed())
// Check that both agents are present
nodeNames := make(map[string]bool)