This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 76e4e21f2 fix(fodc): namespace node labels as node_* and resolve them
live in the agent (#1160)
76e4e21f2 is described below
commit 76e4e21f226cb73cffcd780421d9916bf7ecad2c
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Jun 6 12:52:28 2026 +0800
fix(fodc): namespace node labels as node_* and resolve them live in the
agent (#1160)
---
CHANGES.md | 1 +
fodc/agent/internal/cluster/collector.go | 11 ++-
fodc/agent/internal/cluster/collector_test.go | 14 +++-
fodc/agent/internal/cmd/root.go | 10 ++-
fodc/agent/internal/metrics/parse.go | 30 +++++++-
fodc/agent/internal/metrics/parse_test.go | 43 +++++++++--
fodc/agent/internal/proxy/conn_manager.go | 12 ++-
fodc/agent/internal/watchdog/watchdog.go | 101 +++++++++++++++++++++----
fodc/agent/internal/watchdog/watchdog_test.go | 52 +++++++++++++
fodc/proxy/cmd/proxy/main.go | 2 +-
fodc/proxy/internal/metrics/aggregator.go | 26 +++++--
fodc/proxy/internal/metrics/aggregator_test.go | 69 ++++++++++++++++-
12 files changed, 333 insertions(+), 38 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 5f9f312f5..79c40d6a6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,6 +63,7 @@ Release Notes.
### Bug Fixes
+- FODC: expose node labels reliably and without collisions. The proxy overlaid
each agent's node labels onto every metric under their raw keys, so a data
node's `type=hot/warm/cold` tier label overwrote the merge metrics' own
intrinsic `type="file"/"mem"` dimension on any node whose agent had resolved
its labels. Two fixes: (1) the agent now stamps its node labels **live** from
the cluster collector (which keeps resolving after startup) under a `node_`
prefix per metric — so the tier rel [...]
- Close BanyanDB merge write-path durability gap that allowed torn parts to be
created by a crash between data write and metadata commit. Metadata files
(`metadata.json` for trace/measure/stream, `manifest.json` for sidx, plus
`traceID.filter` and `tag.type`) now go through a new `WriteAtomic` (write-tmp
+ fsync + rename + fsync-dir) sequence; data writers (`seqWriter.Close`,
`localFileSystem.Write`) now propagate fdatasync errors instead of silently
dropping them. `mustOpenFilePart` / ` [...]
- Fix bydbctl command tests using global stdout capture, which caused
race-enabled runs to corrupt captured command output.
- Use `topic` instead of `session_id` as the Prometheus label on liaison
`queue_sub` chunk-ordering counters to avoid unbounded metric cardinality.
diff --git a/fodc/agent/internal/cluster/collector.go
b/fodc/agent/internal/cluster/collector.go
index 42f60e3a0..19850c9a3 100644
--- a/fodc/agent/internal/cluster/collector.go
+++ b/fodc/agent/internal/cluster/collector.go
@@ -240,6 +240,11 @@ func (c *Collector) collectLoop(ctx context.Context) {
case <-c.closer.CloseNotify():
return
case <-ticker.C:
+ // Re-poll the current node every tick, not just at
startup: a node may not have
+ // assumed its role (or populated its labels) when the
agent first polls, and
+ // without a refresh GetNodeInfo would stay
ROLE_UNSPECIFIED with no labels for
+ // the life of the process, leaving node_role/node_type
unresolved on slow nodes.
+ c.pollCurrentNode(ctx)
c.collectClusterState(ctx)
}
}
@@ -536,10 +541,14 @@ func NodeRoleFromNode(node *databasev1.Node) string {
}
// GenerateClusterStateAddrs generates cluster state gRPC addresses from the
given ports.
+// The local node is addressed via the IPv4 loopback (127.0.0.1) rather than
"localhost":
+// "localhost" can resolve to the IPv6 loopback (::1), and gRPC's dial then
fails with
+// "cannot assign requested address" in pods without an IPv6 loopback, leaving
the node's
+// role/labels unresolved.
func GenerateClusterStateAddrs(ports []string) []string {
addrs := make([]string, 0, len(ports))
for _, port := range ports {
- addrs = append(addrs, fmt.Sprintf("localhost:%s", port))
+ addrs = append(addrs, fmt.Sprintf("127.0.0.1:%s", port))
}
return addrs
}
diff --git a/fodc/agent/internal/cluster/collector_test.go
b/fodc/agent/internal/cluster/collector_test.go
index 71ab0850b..e663a56c3 100644
--- a/fodc/agent/internal/cluster/collector_test.go
+++ b/fodc/agent/internal/cluster/collector_test.go
@@ -348,15 +348,15 @@ func TestCollector_GetNodeInfo_NoNode(t *testing.T) {
assert.Nil(t, nodeLabels)
}
-func TestGenerateLifecycleAddrs(t *testing.T) {
+func TestGenerateClusterStateAddrsTable(t *testing.T) {
tests := []struct {
name string
ports []string
expected []string
}{
{"empty ports", []string{}, []string{}},
- {"single port", []string{"17914"}, []string{"localhost:17914"}},
- {"multiple ports", []string{"17914", "17915", "17916"},
[]string{"localhost:17914", "localhost:17915", "localhost:17916"}},
+ {"single port", []string{"17914"}, []string{"127.0.0.1:17914"}},
+ {"multiple ports", []string{"17914", "17915", "17916"},
[]string{"127.0.0.1:17914", "127.0.0.1:17915", "127.0.0.1:17916"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -945,3 +945,11 @@ func TestProcessClusterStates_ComplexTopology(t
*testing.T) {
assert.Contains(t, callMap, expectedCall, "Expected call %s to
be present", expectedCall)
}
}
+
+func TestGenerateClusterStateAddrs_UsesIPv4Loopback(t *testing.T) {
+ // Must use 127.0.0.1, not "localhost": the latter can resolve to ::1
and fail to dial
+ // in pods without an IPv6 loopback, leaving the node role/labels
unresolved.
+ assert.Equal(t, []string{"127.0.0.1:17912", "127.0.0.1:17914"},
+ GenerateClusterStateAddrs([]string{"17912", "17914"}))
+ assert.Empty(t, GenerateClusterStateAddrs(nil))
+}
diff --git a/fodc/agent/internal/cmd/root.go b/fodc/agent/internal/cmd/root.go
index 9576e9427..73ede77ef 100644
--- a/fodc/agent/internal/cmd/root.go
+++ b/fodc/agent/internal/cmd/root.go
@@ -278,6 +278,11 @@ func runFODC(_ *cobra.Command, _ []string) error {
}
wd := watchdog.NewWatchdogWithConfig(fr, metricsEndpoints,
metricsPollInterval, nodeRole, podName, containerNames)
+ if clusterCollector != nil {
+ // Stamp the node role and labels live from the collector,
which keeps resolving after
+ // startup, instead of the one-shot snapshot captured above.
+ wd.SetNodeInfoProvider(clusterCollector.GetNodeInfo)
+ }
stopKTM := initializeKTM(ctx, log, fr)
@@ -434,7 +439,10 @@ func startProxyClient(ctx context.Context, log
*logger.Logger, fr *flightrecorde
}
var lifecycleCollector *lifecycle.Collector
if lifecyclePort > 0 {
- grpcAddr := fmt.Sprintf("localhost:%d", lifecyclePort)
+ // Use the IPv4 loopback rather than "localhost" so the dial
never resolves to the
+ // IPv6 loopback (::1), which fails with "cannot assign
requested address" in pods
+ // without an IPv6 loopback.
+ grpcAddr := fmt.Sprintf("127.0.0.1:%d", lifecyclePort)
lifecycleCollector = lifecycle.NewCollector(log, grpcAddr,
lifecycleReportDir, lifecycleCacheTTL)
log.Info().Str("grpc_addr", grpcAddr).Msg("Lifecycle collector
initialized")
}
diff --git a/fodc/agent/internal/metrics/parse.go
b/fodc/agent/internal/metrics/parse.go
index 974e960d5..0cd477bd9 100644
--- a/fodc/agent/internal/metrics/parse.go
+++ b/fodc/agent/internal/metrics/parse.go
@@ -74,6 +74,12 @@ func (mk MetricKey) String() string {
return fmt.Sprintf("%s{%s}", mk.Name, strings.Join(labelParts, ","))
}
+const (
+ labelNodeRole = "node_role"
+ labelPodName = "pod_name"
+ labelContainerName = "container_name"
+)
+
var (
helpLineRegex = regexp.MustCompile(`^#\s+HELP\s+(\S+)\s+(.+)$`)
// metricLineRegex matches metric lines:
metric_name{label1="value1",label2="value2"} value.
@@ -123,6 +129,15 @@ func ResolveMetricType(typeMap map[string]string, name
string) string {
// ParseWithAgentLabels parses Prometheus text format metrics and returns
structured RawMetric objects.
func ParseWithAgentLabels(text string, nodeRole, podName, containerName
string) ([]RawMetric, error) {
+ return ParseWithNodeLabels(text, nodeRole, podName, containerName, nil)
+}
+
+// ParseWithNodeLabels is ParseWithAgentLabels plus the node's own labels
(e.g. the data-node
+// tier "type"). Each node label is stamped under a "node_" prefix (so "type"
becomes
+// "node_type"), which keeps it from ever colliding with a metric-intrinsic
label of the same
+// name (e.g. the merge "type"). pod_name and container_name are skipped
because they are
+// already stamped as first-class labels.
+func ParseWithNodeLabels(text string, nodeRole, podName, containerName string,
nodeLabels map[string]string) ([]RawMetric, error) {
// Build authoritative type map using expfmt.
// TextToMetricFamilies may return a non-nil error AND a non-empty
families map
// simultaneously (e.g. for "unexpected end of input stream" on
trailing newline).
@@ -172,22 +187,31 @@ func ParseWithAgentLabels(text string, nodeRole, podName,
containerName string)
// Add agent identity labels if provided
if nodeRole != "" {
labels = append(labels, Label{
- Name: "node_role",
+ Name: labelNodeRole,
Value: nodeRole,
})
}
if podName != "" {
labels = append(labels, Label{
- Name: "pod_name",
+ Name: labelPodName,
Value: podName,
})
}
if containerName != "" {
labels = append(labels, Label{
- Name: "container_name",
+ Name: labelContainerName,
Value: containerName,
})
}
+ for nodeLabelKey, nodeLabelValue := range nodeLabels {
+ if nodeLabelValue == "" || nodeLabelKey == labelPodName
|| nodeLabelKey == labelContainerName {
+ continue
+ }
+ labels = append(labels, Label{
+ Name: "node_" + nodeLabelKey,
+ Value: nodeLabelValue,
+ })
+ }
value, parseErr := strconv.ParseFloat(valueStr, 64)
if parseErr != nil {
diff --git a/fodc/agent/internal/metrics/parse_test.go
b/fodc/agent/internal/metrics/parse_test.go
index efb4adff4..6dcbddd6d 100644
--- a/fodc/agent/internal/metrics/parse_test.go
+++ b/fodc/agent/internal/metrics/parse_test.go
@@ -1151,15 +1151,15 @@ http_requests_total{method="GET"} 100`
if label.Value == "localhost" {
hasInstanceLabel = true
}
- case "node_role":
+ case labelNodeRole:
if label.Value == testNodeRole {
hasNodeRole = true
}
- case "pod_name":
+ case labelPodName:
if label.Value == "test-pod" {
hasPodName = true
}
- case "container_name":
+ case labelContainerName:
if label.Value == testContainerName {
hasContainerName = true
}
@@ -1186,15 +1186,15 @@ http_requests_total{method="GET"} 100`
if label.Value == "GET" {
hasMethodLabel = true
}
- case "node_role":
+ case labelNodeRole:
if label.Value == testNodeRole {
hasNodeRole = true
}
- case "pod_name":
+ case labelPodName:
if label.Value == "test-pod" {
hasPodName = true
}
- case "container_name":
+ case labelContainerName:
if label.Value == testContainerName {
hasContainerName = true
}
@@ -1231,10 +1231,10 @@ func TestParse_WithPartialAgentIdentityLabels(t
*testing.T) {
hasNodeRole := false
hasContainerName := false
for _, label := range metrics[0].Labels {
- if label.Name == "node_role" && label.Value == testNodeRole {
+ if label.Name == labelNodeRole && label.Value == testNodeRole {
hasNodeRole = true
}
- if label.Name == "container_name" && label.Value ==
testContainerName {
+ if label.Name == labelContainerName && label.Value ==
testContainerName {
hasContainerName = true
}
}
@@ -1253,3 +1253,30 @@ func TestParse_WithoutAgentIdentityLabels(t *testing.T) {
assert.Equal(t, "instance", metrics[0].Labels[0].Name)
assert.Equal(t, "localhost", metrics[0].Labels[0].Value)
}
+
+func TestParseWithNodeLabels_NamespacesAndSkips(t *testing.T) {
+ // A merge metric with an intrinsic "type" label, scraped from a hot
data node.
+ text := `banyandb_measure_total_merged_parts{type="file"} 3`
+
+ metrics, err := ParseWithNodeLabels(text, "ROLE_DATA",
"demo-banyandb-data-hot-0", "data",
+ map[string]string{"type": "hot", labelPodName:
"demo-banyandb-data-hot-0", labelContainerName: "data", "empty": ""})
+
+ require.NoError(t, err)
+ require.Len(t, metrics, 1)
+
+ got := make(map[string]string)
+ for _, l := range metrics[0].Labels {
+ got[l.Name] = l.Value
+ }
+ // Intrinsic type is preserved; the node tier is namespaced to
node_type (no collision).
+ assert.Equal(t, "file", got["type"])
+ assert.Equal(t, "hot", got["node_type"])
+ // First-class identity labels are not re-applied under the node_
prefix, and empty values are skipped.
+ assert.Equal(t, "demo-banyandb-data-hot-0", got[labelPodName])
+ _, hasNodePodName := got["node_pod_name"]
+ assert.False(t, hasNodePodName)
+ _, hasNodeContainer := got["node_container_name"]
+ assert.False(t, hasNodeContainer)
+ _, hasNodeEmpty := got["node_empty"]
+ assert.False(t, hasNodeEmpty)
+}
diff --git a/fodc/agent/internal/proxy/conn_manager.go
b/fodc/agent/internal/proxy/conn_manager.go
index 8fb58f302..1debc5322 100644
--- a/fodc/agent/internal/proxy/conn_manager.go
+++ b/fodc/agent/internal/proxy/conn_manager.go
@@ -34,6 +34,10 @@ import (
const (
connManagerMaxRetryInterval = 60 * time.Second
connManagerMaxRetries = 3
+ // maxProxyCallMsgSize bounds gRPC messages exchanged with the proxy. A
data node's metrics
+ // push can exceed the gRPC 4MB default and otherwise gets rejected
with ResourceExhausted,
+ // causing an endless reconnect storm. Keep in sync with the proxy's
--grpc-max-msg-size.
+ maxProxyCallMsgSize = 32 << 20 // 32MB
)
// connEventType represents the type of connection event.
@@ -248,7 +252,13 @@ func (cm *connManager) doConnect(ctx context.Context)
connResult {
default:
}
- conn, dialErr := grpc.NewClient(cm.proxyAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ conn, dialErr := grpc.NewClient(cm.proxyAddr,
+
grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithDefaultCallOptions(
+ grpc.MaxCallRecvMsgSize(maxProxyCallMsgSize),
+ grpc.MaxCallSendMsgSize(maxProxyCallMsgSize),
+ ),
+ )
if dialErr == nil {
cm.logger.Info().Str("proxy_addr",
cm.proxyAddr).Int("attempt", attempt).Msg("Connected to FODC Proxy")
return connResult{conn: conn}
diff --git a/fodc/agent/internal/watchdog/watchdog.go
b/fodc/agent/internal/watchdog/watchdog.go
index 080322b91..346a3a95d 100644
--- a/fodc/agent/internal/watchdog/watchdog.go
+++ b/fodc/agent/internal/watchdog/watchdog.go
@@ -26,6 +26,7 @@ import (
"sync"
"time"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/panicdiag"
@@ -42,26 +43,36 @@ const (
maxRetries = 3
initialBackoff = 100 * time.Millisecond
maxBackoff = 5 * time.Second
+ // nodeResolveGracePeriod bounds how long the watchdog defers recording
while the local
+ // node's role is still unresolved. Deferring avoids emitting metrics
stamped with an
+ // unresolved identity that later turns into duplicate (ghost) series
once the role
+ // resolves; after the grace period a never-resolving node still has
its metrics collected.
+ nodeResolveGracePeriod = 5 * time.Minute
)
+// roleUnspecified is the string form of an unresolved node role.
+var roleUnspecified =
databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)]
+
// Watchdog periodically polls metrics from BanyanDB and forwards them to
Flight Recorder.
type Watchdog struct {
- recorder MetricsRecorder
- log *logger.Logger
- ctx context.Context
- cancel context.CancelFunc
- client *http.Client
-
+ startTime time.Time
+ ctx context.Context
+ recorder MetricsRecorder
+ nodeInfo func() (role string, labels map[string]string)
+ log *logger.Logger
+ cancel context.CancelFunc
+ client *http.Client
+ resolvedLabels map[string]string
nodeRole string
podName string
+ resolvedRole string
urls []string
containerNames []string
-
- interval time.Duration
- retryBackoff time.Duration
- mu sync.RWMutex
- wg sync.WaitGroup
- isRunning bool
+ wg sync.WaitGroup
+ interval time.Duration
+ retryBackoff time.Duration
+ mu sync.RWMutex
+ isRunning bool
}
// NewWatchdogWithConfig creates a new Watchdog instance with specified
configuration.
@@ -74,12 +85,71 @@ func NewWatchdogWithConfig(recorder MetricsRecorder, urls
[]string, interval tim
ctx: ctx,
cancel: cancel,
retryBackoff: initialBackoff,
+ startTime: time.Now(),
nodeRole: nodeRole,
podName: podName,
containerNames: containerNames,
}
}
+// SetNodeInfoProvider supplies a live source of the node's role and labels,
used to stamp
+// metrics at scrape time. This avoids freezing the role/labels at a startup
snapshot, which
+// is unreliable when the local node has not yet resolved its role when the
agent starts.
+func (w *Watchdog) SetNodeInfoProvider(fn func() (role string, labels
map[string]string)) {
+ w.mu.Lock()
+ w.nodeInfo = fn
+ w.mu.Unlock()
+}
+
+// resolveNodeInfo returns the current node role and labels from the live
provider, falling back
+// to the static role captured at construction when no provider is set. The
first resolved
+// identity is cached and "sticks": if the provider later regresses to an
unresolved role
+// (ROLE_UNSPECIFIED), the cached resolved identity is returned instead.
Without this, a regression
+// would cause the flight recorder to buffer a duplicate (ghost) series under
the unresolved
+// identity, which is never evicted.
+func (w *Watchdog) resolveNodeInfo() (role string, labels map[string]string) {
+ w.mu.RLock()
+ fn := w.nodeInfo
+ cachedRole, cachedLabels := w.resolvedRole, w.resolvedLabels
+ w.mu.RUnlock()
+ if fn == nil {
+ return w.nodeRole, nil
+ }
+ role, labels = fn()
+ if role != "" && role != roleUnspecified {
+ w.mu.Lock()
+ w.resolvedRole, w.resolvedLabels = role, labels
+ w.mu.Unlock()
+ return role, labels
+ }
+ if cachedRole != "" {
+ return cachedRole, cachedLabels
+ }
+ return role, labels
+}
+
+// nodeReadyToRecord reports whether the watchdog should record metrics this
cycle. While a live
+// node-info provider is configured but the node role has not resolved yet,
recording is deferred
+// (up to nodeResolveGracePeriod) so the flight recorder never buffers metrics
under an unresolved
+// identity — those would otherwise linger as duplicate (ghost) series once
the role resolves and
+// the identity labels change. Once resolved, the gate opens permanently; if
the role never
+// resolves, the grace period ensures the node's metrics are still recorded.
+func (w *Watchdog) nodeReadyToRecord() bool {
+ w.mu.RLock()
+ fn := w.nodeInfo
+ start := w.startTime
+ w.mu.RUnlock()
+ if fn == nil {
+ return true
+ }
+ // resolveNodeInfo caches and sticks to the first resolved identity, so
once the role has
+ // resolved this stays true even if the live provider briefly regresses
to unspecified.
+ if role, _ := w.resolveNodeInfo(); role != "" && role !=
roleUnspecified {
+ return true
+ }
+ return time.Since(start) >= nodeResolveGracePeriod
+}
+
// Name returns the name of the watchdog service.
func (w *Watchdog) Name() string {
return "watchdog"
@@ -222,6 +292,10 @@ func (w *Watchdog) GracefulStop() {
// It returns the enriched context so the caller can update the
recovery-visible
// ctx pointer, making breadcrumbs visible if a panic occurs.
func (w *Watchdog) pollAndForward(ctx context.Context) (context.Context,
error) {
+ if !w.nodeReadyToRecord() {
+ w.log.Debug().Msg("Deferring metrics recording until the node
role resolves (avoids duplicate series)")
+ return ctx, nil
+ }
ctx = panicdiag.WithBreadcrumb(ctx, "poll watchdog metrics",
"fodc-watchdog", map[string]string{
"endpoint_count": fmt.Sprintf("%d", len(w.urls)),
})
@@ -328,7 +402,8 @@ func (w *Watchdog) pollMetricsFromEndpoint(ctx
context.Context, url string, cont
currentBackoff = w.exponentialBackoff(currentBackoff)
continue
}
- parsedMetrics, parseErr :=
metrics.ParseWithAgentLabels(string(body), w.nodeRole, w.podName, containerName)
+ nodeRole, nodeLabels := w.resolveNodeInfo()
+ parsedMetrics, parseErr :=
metrics.ParseWithNodeLabels(string(body), nodeRole, w.podName, containerName,
nodeLabels)
if parseErr != nil {
lastErr = fmt.Errorf("failed to parse metrics: %w",
parseErr)
currentBackoff = w.exponentialBackoff(currentBackoff)
diff --git a/fodc/agent/internal/watchdog/watchdog_test.go
b/fodc/agent/internal/watchdog/watchdog_test.go
index a8ac7e807..76a746a3d 100644
--- a/fodc/agent/internal/watchdog/watchdog_test.go
+++ b/fodc/agent/internal/watchdog/watchdog_test.go
@@ -828,3 +828,55 @@ func TestWatchdog_PollMetrics_NoEndpoints(t *testing.T) {
assert.Nil(t, rawMetrics)
assert.Contains(t, err.Error(), "no metrics endpoints configured")
}
+
+func TestNodeReadyToRecord(t *testing.T) {
+ // No live provider configured: always ready (preserves behavior
without a collector).
+ noProvider := NewWatchdogWithConfig(nil, nil, time.Second, "ROLE_DATA",
"p", []string{"data"})
+ assert.True(t, noProvider.nodeReadyToRecord())
+
+ // Provider reports an unresolved role within the grace period: defer
recording.
+ unresolved := NewWatchdogWithConfig(nil, nil, time.Second, "", "p",
[]string{"data"})
+ unresolved.SetNodeInfoProvider(func() (string, map[string]string) {
return roleUnspecified, nil })
+ assert.False(t, unresolved.nodeReadyToRecord())
+
+ // Once a resolved role is seen the gate opens and stays open even if
it regresses.
+ regress := false
+ flips := NewWatchdogWithConfig(nil, nil, time.Second, "", "p",
[]string{"data"})
+ flips.SetNodeInfoProvider(func() (string, map[string]string) {
+ if regress {
+ return roleUnspecified, nil
+ }
+ return "ROLE_DATA", map[string]string{"type": "cold"}
+ })
+ assert.True(t, flips.nodeReadyToRecord())
+ regress = true
+ assert.True(t, flips.nodeReadyToRecord(), "should stay ready once
resolved")
+
+ // Unresolved but past the grace period: record anyway so metrics are
never permanently dropped.
+ stuck := NewWatchdogWithConfig(nil, nil, time.Second, "", "p",
[]string{"data"})
+ stuck.SetNodeInfoProvider(func() (string, map[string]string) { return
roleUnspecified, nil })
+ stuck.startTime = time.Now().Add(-2 * nodeResolveGracePeriod)
+ assert.True(t, stuck.nodeReadyToRecord(), "should record after the
grace period even if unresolved")
+}
+
+func TestResolveNodeInfoSticky(t *testing.T) {
+ regress := false
+ wd := NewWatchdogWithConfig(nil, nil, time.Second, "", "p",
[]string{"data"})
+ wd.SetNodeInfoProvider(func() (string, map[string]string) {
+ if regress {
+ return roleUnspecified, nil
+ }
+ return "ROLE_DATA", map[string]string{"type": "warm"}
+ })
+
+ role, labels := wd.resolveNodeInfo()
+ assert.Equal(t, "ROLE_DATA", role)
+ assert.Equal(t, "warm", labels["type"])
+
+ // Once resolved, a regression to an unresolved role must not be
returned, otherwise the
+ // flight recorder buffers a duplicate ghost series under the
unresolved identity.
+ regress = true
+ role, labels = wd.resolveNodeInfo()
+ assert.Equal(t, "ROLE_DATA", role, "must stick to the resolved role")
+ assert.Equal(t, "warm", labels["type"], "must stick to the resolved
labels")
+}
diff --git a/fodc/proxy/cmd/proxy/main.go b/fodc/proxy/cmd/proxy/main.go
index 510360204..2c36bd145 100644
--- a/fodc/proxy/cmd/proxy/main.go
+++ b/fodc/proxy/cmd/proxy/main.go
@@ -44,7 +44,7 @@ const (
defaultHeartbeatTimeout = 30 * time.Second
defaultCleanupTimeout = 5 * time.Minute
defaultMaxAgents = 1000
- defaultGRPCMaxMsgSize = 4194304 // 4MB
+ defaultGRPCMaxMsgSize = 32 << 20 // 32MB; a single data node's
metrics push can exceed the gRPC 4MB default.
defaultHTTPReadTimeout = 10 * time.Second
defaultHTTPWriteTimeout = 10 * time.Second
defaultHeartbeatInterval = 10 * time.Second
diff --git a/fodc/proxy/internal/metrics/aggregator.go
b/fodc/proxy/internal/metrics/aggregator.go
index 986352a62..3757dec7f 100644
--- a/fodc/proxy/internal/metrics/aggregator.go
+++ b/fodc/proxy/internal/metrics/aggregator.go
@@ -20,6 +20,7 @@ package metrics
import (
"context"
+ "maps"
"strings"
"sync"
"time"
@@ -35,6 +36,13 @@ const (
// maxCollectionTimeout is the maximum timeout allowed for collecting
metrics,
// preventing excessively long waits for wide time windows.
maxCollectionTimeout = 5 * time.Minute
+ // podNameLabelName and containerNameLabelName are first-class node
identity labels that are
+ // already stamped per metric, so they are not re-applied as namespaced
node labels.
+ podNameLabelName = "pod_name"
+ containerNameLabelName = "container_name"
+ // nodeLabelPrefix namespaces a node's own labels (e.g. the data-node
tier "type" becomes
+ // "node_type") so they can never collide with a metric-intrinsic label
of the same name.
+ nodeLabelPrefix = "node_"
)
// AggregatedMetric represents an aggregated metric with node metadata.
@@ -113,13 +121,21 @@ func (ma *Aggregator) ProcessMetricsFromAgent(ctx
context.Context, agentID strin
aggregatedMetrics := make([]*AggregatedMetric, 0, len(req.Metrics))
for _, metric := range req.Metrics {
- labels := make(map[string]string)
- for key, value := range metric.Labels {
- labels[key] = value
- }
+ labels := make(map[string]string, len(metric.Labels))
+ maps.Copy(labels, metric.Labels)
+ // Overlay the agent's node labels under a "node_" prefix so
they can never collide
+ // with a metric-intrinsic label of the same name (e.g. the
merge "type"). pod_name and
+ // container_name are already first-class labels and are
skipped. A namespaced label
+ // already present (stamped per metric by the agent) is left
untouched.
for key, value := range agentInfo.Labels {
- labels[key] = value
+ if value == "" || key == podNameLabelName || key ==
containerNameLabelName {
+ continue
+ }
+ prefixed := nodeLabelPrefix + key
+ if _, exists := labels[prefixed]; !exists {
+ labels[prefixed] = value
+ }
}
var timestamp time.Time
diff --git a/fodc/proxy/internal/metrics/aggregator_test.go
b/fodc/proxy/internal/metrics/aggregator_test.go
index 678b07156..a7fb6a637 100644
--- a/fodc/proxy/internal/metrics/aggregator_test.go
+++ b/fodc/proxy/internal/metrics/aggregator_test.go
@@ -251,8 +251,11 @@ func TestProcessMetricsFromAgent_WithAgentLabels(t
*testing.T) {
case metrics := <-collectCh:
require.Equal(t, 1, len(metrics))
metric := metrics[0]
- assert.Equal(t, "prod", metric.Labels["env"])
- assert.Equal(t, "us-east", metric.Labels["zone"])
+ // Custom node labels are exposed under the node_ prefix.
+ assert.Equal(t, "prod", metric.Labels["node_env"])
+ assert.Equal(t, "us-east", metric.Labels["node_zone"])
+ // The metric's own intrinsic labels and first-class identity
labels are untouched.
+ assert.Equal(t, "heap", metric.Labels["type"])
assert.Equal(t, "test", metric.Labels["pod_name"])
assert.Equal(t, "master", metric.Labels["container_name"])
case <-time.After(1 * time.Second):
@@ -818,3 +821,65 @@ func TestMatchesAddress_PortMismatch(t *testing.T) {
assert.False(t, result)
}
+
+func collectOneMetric(t *testing.T, aggregator *Aggregator, agentID string,
agentInfo *registry.AgentInfo,
+ req *fodcv1.StreamMetricsRequest,
+) *AggregatedMetric {
+ t.Helper()
+ aggregator.collectingMu.Lock()
+ collectCh := make(chan []*AggregatedMetric, 1)
+ aggregator.collecting[agentID] = collectCh
+ aggregator.collectingMu.Unlock()
+
+ require.NoError(t,
aggregator.ProcessMetricsFromAgent(context.Background(), agentID, agentInfo,
req))
+
+ select {
+ case got := <-collectCh:
+ require.Len(t, got, 1)
+ return got[0]
+ case <-time.After(time.Second):
+ t.Fatal("timeout waiting for metrics")
+ return nil
+ }
+}
+
+func TestProcessMetricsFromAgent_NamespacesNodeLabels(t *testing.T) {
+ aggregator, testRegistry, _ := newTestAggregator(t)
+ // The agent reports node labels: the tier ("type") plus pod_name
(which is already a
+ // first-class label and must not be re-applied as node_pod_name).
+ agentID := createTestAgent(t, testRegistry,
"demo-banyandb-data-warm-1", "datanode-warm",
+ map[string]string{"type": "warm", "pod_name":
"demo-banyandb-data-warm-1"})
+ agentInfo, getErr := testRegistry.GetAgentByID(agentID)
+ require.NoError(t, getErr)
+
+ req :=
createTestStreamMetricsRequest("banyandb_queue_sub_total_msg_received", 5,
map[string]string{
+ "pod_name": "demo-banyandb-data-warm-1",
+ "topic": "v1:stream-query",
+ }, nil)
+
+ metric := collectOneMetric(t, aggregator, agentID, agentInfo, req)
+ assert.Equal(t, "warm", metric.Labels["node_type"], "tier exposed as
node_type")
+ _, hasRawType := metric.Labels["type"]
+ assert.False(t, hasRawType, "node label must not appear under the raw
key")
+ _, hasNodePodName := metric.Labels["node_pod_name"]
+ assert.False(t, hasNodePodName, "pod_name must not be namespaced")
+}
+
+func TestProcessMetricsFromAgent_NodeLabelDoesNotClobberMetricLabel(t
*testing.T) {
+ aggregator, testRegistry, _ := newTestAggregator(t)
+ // The agent reports a node "type" label (the data-node storage tier).
+ agentID := createTestAgent(t, testRegistry, "demo-banyandb-data-hot-0",
"datanode-hot", map[string]string{"type": "hot"})
+ agentInfo, getErr := testRegistry.GetAgentByID(agentID)
+ require.NoError(t, getErr)
+
+ // The metric carries its own intrinsic "type" label (e.g. a merge part
type).
+ req :=
createTestStreamMetricsRequest("banyandb_measure_total_merged_parts", 3,
map[string]string{
+ "pod_name": "demo-banyandb-data-hot-0",
+ "type": "file",
+ }, nil)
+
+ metric := collectOneMetric(t, aggregator, agentID, agentInfo, req)
+ // The metric's intrinsic type is preserved; the node tier lands on
node_type, no collision.
+ assert.Equal(t, "file", metric.Labels["type"])
+ assert.Equal(t, "hot", metric.Labels["node_type"])
+}