This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 76e4e21f2 fix(fodc): namespace node labels as node_* and resolve them 
live in the agent (#1160)
76e4e21f2 is described below

commit 76e4e21f226cb73cffcd780421d9916bf7ecad2c
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Jun 6 12:52:28 2026 +0800

    fix(fodc): namespace node labels as node_* and resolve them live in the 
agent (#1160)
---
 CHANGES.md                                     |   1 +
 fodc/agent/internal/cluster/collector.go       |  11 ++-
 fodc/agent/internal/cluster/collector_test.go  |  14 +++-
 fodc/agent/internal/cmd/root.go                |  10 ++-
 fodc/agent/internal/metrics/parse.go           |  30 +++++++-
 fodc/agent/internal/metrics/parse_test.go      |  43 +++++++++--
 fodc/agent/internal/proxy/conn_manager.go      |  12 ++-
 fodc/agent/internal/watchdog/watchdog.go       | 101 +++++++++++++++++++++----
 fodc/agent/internal/watchdog/watchdog_test.go  |  52 +++++++++++++
 fodc/proxy/cmd/proxy/main.go                   |   2 +-
 fodc/proxy/internal/metrics/aggregator.go      |  26 +++++--
 fodc/proxy/internal/metrics/aggregator_test.go |  69 ++++++++++++++++-
 12 files changed, 333 insertions(+), 38 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 5f9f312f5..79c40d6a6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,6 +63,7 @@ Release Notes.
 
 ### Bug Fixes
 
+- FODC: expose node labels reliably and without collisions. The proxy overlaid 
each agent's node labels onto every metric under their raw keys, so a data 
node's `type=hot/warm/cold` tier label overwrote the merge metrics' own 
intrinsic `type="file"/"mem"` dimension on any node whose agent had resolved 
its labels. Two fixes: (1) the agent now stamps its node labels **live** from 
the cluster collector (which keeps resolving after startup) under a `node_` 
prefix per metric — so the tier rel [...]
 - Close BanyanDB merge write-path durability gap that allowed torn parts to be 
created by a crash between data write and metadata commit. Metadata files 
(`metadata.json` for trace/measure/stream, `manifest.json` for sidx, plus 
`traceID.filter` and `tag.type`) now go through a new `WriteAtomic` (write-tmp 
+ fsync + rename + fsync-dir) sequence; data writers (`seqWriter.Close`, 
`localFileSystem.Write`) now propagate fdatasync errors instead of silently 
dropping them. `mustOpenFilePart` / ` [...]
 - Fix bydbctl command tests using global stdout capture, which caused 
race-enabled runs to corrupt captured command output.
 - Use `topic` instead of `session_id` as the Prometheus label on liaison 
`queue_sub` chunk-ordering counters to avoid unbounded metric cardinality.
diff --git a/fodc/agent/internal/cluster/collector.go 
b/fodc/agent/internal/cluster/collector.go
index 42f60e3a0..19850c9a3 100644
--- a/fodc/agent/internal/cluster/collector.go
+++ b/fodc/agent/internal/cluster/collector.go
@@ -240,6 +240,11 @@ func (c *Collector) collectLoop(ctx context.Context) {
                case <-c.closer.CloseNotify():
                        return
                case <-ticker.C:
+                       // Re-poll the current node every tick, not just at 
startup: a node may not have
+                       // assumed its role (or populated its labels) when the 
agent first polls, and
+                       // without a refresh GetNodeInfo would stay 
ROLE_UNSPECIFIED with no labels for
+                       // the life of the process, leaving node_role/node_type 
unresolved on slow nodes.
+                       c.pollCurrentNode(ctx)
                        c.collectClusterState(ctx)
                }
        }
@@ -536,10 +541,14 @@ func NodeRoleFromNode(node *databasev1.Node) string {
 }
 
 // GenerateClusterStateAddrs generates cluster state gRPC addresses from the 
given ports.
+// The local node is addressed via the IPv4 loopback (127.0.0.1) rather than 
"localhost":
+// "localhost" can resolve to the IPv6 loopback (::1), and gRPC's dial then 
fails with
+// "cannot assign requested address" in pods without an IPv6 loopback, leaving 
the node's
+// role/labels unresolved.
 func GenerateClusterStateAddrs(ports []string) []string {
        addrs := make([]string, 0, len(ports))
        for _, port := range ports {
-               addrs = append(addrs, fmt.Sprintf("localhost:%s", port))
+               addrs = append(addrs, fmt.Sprintf("127.0.0.1:%s", port))
        }
        return addrs
 }
diff --git a/fodc/agent/internal/cluster/collector_test.go 
b/fodc/agent/internal/cluster/collector_test.go
index 71ab0850b..e663a56c3 100644
--- a/fodc/agent/internal/cluster/collector_test.go
+++ b/fodc/agent/internal/cluster/collector_test.go
@@ -348,15 +348,15 @@ func TestCollector_GetNodeInfo_NoNode(t *testing.T) {
        assert.Nil(t, nodeLabels)
 }
 
-func TestGenerateLifecycleAddrs(t *testing.T) {
+func TestGenerateClusterStateAddrsTable(t *testing.T) {
        tests := []struct {
                name     string
                ports    []string
                expected []string
        }{
                {"empty ports", []string{}, []string{}},
-               {"single port", []string{"17914"}, []string{"localhost:17914"}},
-               {"multiple ports", []string{"17914", "17915", "17916"}, 
[]string{"localhost:17914", "localhost:17915", "localhost:17916"}},
+               {"single port", []string{"17914"}, []string{"127.0.0.1:17914"}},
+               {"multiple ports", []string{"17914", "17915", "17916"}, 
[]string{"127.0.0.1:17914", "127.0.0.1:17915", "127.0.0.1:17916"}},
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
@@ -945,3 +945,11 @@ func TestProcessClusterStates_ComplexTopology(t 
*testing.T) {
                assert.Contains(t, callMap, expectedCall, "Expected call %s to 
be present", expectedCall)
        }
 }
+
+func TestGenerateClusterStateAddrs_UsesIPv4Loopback(t *testing.T) {
+       // Must use 127.0.0.1, not "localhost": the latter can resolve to ::1 
and fail to dial
+       // in pods without an IPv6 loopback, leaving the node role/labels 
unresolved.
+       assert.Equal(t, []string{"127.0.0.1:17912", "127.0.0.1:17914"},
+               GenerateClusterStateAddrs([]string{"17912", "17914"}))
+       assert.Empty(t, GenerateClusterStateAddrs(nil))
+}
diff --git a/fodc/agent/internal/cmd/root.go b/fodc/agent/internal/cmd/root.go
index 9576e9427..73ede77ef 100644
--- a/fodc/agent/internal/cmd/root.go
+++ b/fodc/agent/internal/cmd/root.go
@@ -278,6 +278,11 @@ func runFODC(_ *cobra.Command, _ []string) error {
        }
 
        wd := watchdog.NewWatchdogWithConfig(fr, metricsEndpoints, 
metricsPollInterval, nodeRole, podName, containerNames)
+       if clusterCollector != nil {
+               // Stamp the node role and labels live from the collector, 
which keeps resolving after
+               // startup, instead of the one-shot snapshot captured above.
+               wd.SetNodeInfoProvider(clusterCollector.GetNodeInfo)
+       }
 
        stopKTM := initializeKTM(ctx, log, fr)
 
@@ -434,7 +439,10 @@ func startProxyClient(ctx context.Context, log 
*logger.Logger, fr *flightrecorde
        }
        var lifecycleCollector *lifecycle.Collector
        if lifecyclePort > 0 {
-               grpcAddr := fmt.Sprintf("localhost:%d", lifecyclePort)
+               // Use the IPv4 loopback rather than "localhost" so the dial 
never resolves to the
+               // IPv6 loopback (::1), which fails with "cannot assign 
requested address" in pods
+               // without an IPv6 loopback.
+               grpcAddr := fmt.Sprintf("127.0.0.1:%d", lifecyclePort)
                lifecycleCollector = lifecycle.NewCollector(log, grpcAddr, 
lifecycleReportDir, lifecycleCacheTTL)
                log.Info().Str("grpc_addr", grpcAddr).Msg("Lifecycle collector 
initialized")
        }
diff --git a/fodc/agent/internal/metrics/parse.go 
b/fodc/agent/internal/metrics/parse.go
index 974e960d5..0cd477bd9 100644
--- a/fodc/agent/internal/metrics/parse.go
+++ b/fodc/agent/internal/metrics/parse.go
@@ -74,6 +74,12 @@ func (mk MetricKey) String() string {
        return fmt.Sprintf("%s{%s}", mk.Name, strings.Join(labelParts, ","))
 }
 
+const (
+       labelNodeRole      = "node_role"
+       labelPodName       = "pod_name"
+       labelContainerName = "container_name"
+)
+
 var (
        helpLineRegex = regexp.MustCompile(`^#\s+HELP\s+(\S+)\s+(.+)$`)
        // metricLineRegex matches metric lines: 
metric_name{label1="value1",label2="value2"} value.
@@ -123,6 +129,15 @@ func ResolveMetricType(typeMap map[string]string, name 
string) string {
 
 // ParseWithAgentLabels parses Prometheus text format metrics and returns 
structured RawMetric objects.
 func ParseWithAgentLabels(text string, nodeRole, podName, containerName 
string) ([]RawMetric, error) {
+       return ParseWithNodeLabels(text, nodeRole, podName, containerName, nil)
+}
+
+// ParseWithNodeLabels is ParseWithAgentLabels plus the node's own labels 
(e.g. the data-node
+// tier "type"). Each node label is stamped under a "node_" prefix (so "type" 
becomes
+// "node_type"), which keeps it from ever colliding with a metric-intrinsic 
label of the same
+// name (e.g. the merge "type"). pod_name and container_name are skipped 
because they are
+// already stamped as first-class labels.
+func ParseWithNodeLabels(text string, nodeRole, podName, containerName string, 
nodeLabels map[string]string) ([]RawMetric, error) {
        // Build authoritative type map using expfmt.
        // TextToMetricFamilies may return a non-nil error AND a non-empty 
families map
        // simultaneously (e.g. for "unexpected end of input stream" on 
trailing newline).
@@ -172,22 +187,31 @@ func ParseWithAgentLabels(text string, nodeRole, podName, 
containerName string)
                // Add agent identity labels if provided
                if nodeRole != "" {
                        labels = append(labels, Label{
-                               Name:  "node_role",
+                               Name:  labelNodeRole,
                                Value: nodeRole,
                        })
                }
                if podName != "" {
                        labels = append(labels, Label{
-                               Name:  "pod_name",
+                               Name:  labelPodName,
                                Value: podName,
                        })
                }
                if containerName != "" {
                        labels = append(labels, Label{
-                               Name:  "container_name",
+                               Name:  labelContainerName,
                                Value: containerName,
                        })
                }
+               for nodeLabelKey, nodeLabelValue := range nodeLabels {
+                       if nodeLabelValue == "" || nodeLabelKey == labelPodName 
|| nodeLabelKey == labelContainerName {
+                               continue
+                       }
+                       labels = append(labels, Label{
+                               Name:  "node_" + nodeLabelKey,
+                               Value: nodeLabelValue,
+                       })
+               }
 
                value, parseErr := strconv.ParseFloat(valueStr, 64)
                if parseErr != nil {
diff --git a/fodc/agent/internal/metrics/parse_test.go 
b/fodc/agent/internal/metrics/parse_test.go
index efb4adff4..6dcbddd6d 100644
--- a/fodc/agent/internal/metrics/parse_test.go
+++ b/fodc/agent/internal/metrics/parse_test.go
@@ -1151,15 +1151,15 @@ http_requests_total{method="GET"} 100`
                        if label.Value == "localhost" {
                                hasInstanceLabel = true
                        }
-               case "node_role":
+               case labelNodeRole:
                        if label.Value == testNodeRole {
                                hasNodeRole = true
                        }
-               case "pod_name":
+               case labelPodName:
                        if label.Value == "test-pod" {
                                hasPodName = true
                        }
-               case "container_name":
+               case labelContainerName:
                        if label.Value == testContainerName {
                                hasContainerName = true
                        }
@@ -1186,15 +1186,15 @@ http_requests_total{method="GET"} 100`
                        if label.Value == "GET" {
                                hasMethodLabel = true
                        }
-               case "node_role":
+               case labelNodeRole:
                        if label.Value == testNodeRole {
                                hasNodeRole = true
                        }
-               case "pod_name":
+               case labelPodName:
                        if label.Value == "test-pod" {
                                hasPodName = true
                        }
-               case "container_name":
+               case labelContainerName:
                        if label.Value == testContainerName {
                                hasContainerName = true
                        }
@@ -1231,10 +1231,10 @@ func TestParse_WithPartialAgentIdentityLabels(t 
*testing.T) {
        hasNodeRole := false
        hasContainerName := false
        for _, label := range metrics[0].Labels {
-               if label.Name == "node_role" && label.Value == testNodeRole {
+               if label.Name == labelNodeRole && label.Value == testNodeRole {
                        hasNodeRole = true
                }
-               if label.Name == "container_name" && label.Value == 
testContainerName {
+               if label.Name == labelContainerName && label.Value == 
testContainerName {
                        hasContainerName = true
                }
        }
@@ -1253,3 +1253,30 @@ func TestParse_WithoutAgentIdentityLabels(t *testing.T) {
        assert.Equal(t, "instance", metrics[0].Labels[0].Name)
        assert.Equal(t, "localhost", metrics[0].Labels[0].Value)
 }
+
+func TestParseWithNodeLabels_NamespacesAndSkips(t *testing.T) {
+       // A merge metric with an intrinsic "type" label, scraped from a hot 
data node.
+       text := `banyandb_measure_total_merged_parts{type="file"} 3`
+
+       metrics, err := ParseWithNodeLabels(text, "ROLE_DATA", 
"demo-banyandb-data-hot-0", "data",
+               map[string]string{"type": "hot", labelPodName: 
"demo-banyandb-data-hot-0", labelContainerName: "data", "empty": ""})
+
+       require.NoError(t, err)
+       require.Len(t, metrics, 1)
+
+       got := make(map[string]string)
+       for _, l := range metrics[0].Labels {
+               got[l.Name] = l.Value
+       }
+       // Intrinsic type is preserved; the node tier is namespaced to 
node_type (no collision).
+       assert.Equal(t, "file", got["type"])
+       assert.Equal(t, "hot", got["node_type"])
+       // First-class identity labels are not re-applied under the node_ 
prefix, and empty values are skipped.
+       assert.Equal(t, "demo-banyandb-data-hot-0", got[labelPodName])
+       _, hasNodePodName := got["node_pod_name"]
+       assert.False(t, hasNodePodName)
+       _, hasNodeContainer := got["node_container_name"]
+       assert.False(t, hasNodeContainer)
+       _, hasNodeEmpty := got["node_empty"]
+       assert.False(t, hasNodeEmpty)
+}
diff --git a/fodc/agent/internal/proxy/conn_manager.go 
b/fodc/agent/internal/proxy/conn_manager.go
index 8fb58f302..1debc5322 100644
--- a/fodc/agent/internal/proxy/conn_manager.go
+++ b/fodc/agent/internal/proxy/conn_manager.go
@@ -34,6 +34,10 @@ import (
 const (
        connManagerMaxRetryInterval = 60 * time.Second
        connManagerMaxRetries       = 3
+       // maxProxyCallMsgSize bounds gRPC messages exchanged with the proxy. A 
data node's metrics
+       // push can exceed the gRPC 4MB default and otherwise gets rejected 
with ResourceExhausted,
+       // causing an endless reconnect storm. Keep in sync with the proxy's 
--grpc-max-msg-size.
+       maxProxyCallMsgSize = 32 << 20 // 32MB
 )
 
 // connEventType represents the type of connection event.
@@ -248,7 +252,13 @@ func (cm *connManager) doConnect(ctx context.Context) 
connResult {
                default:
                }
 
-               conn, dialErr := grpc.NewClient(cm.proxyAddr, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               conn, dialErr := grpc.NewClient(cm.proxyAddr,
+                       
grpc.WithTransportCredentials(insecure.NewCredentials()),
+                       grpc.WithDefaultCallOptions(
+                               grpc.MaxCallRecvMsgSize(maxProxyCallMsgSize),
+                               grpc.MaxCallSendMsgSize(maxProxyCallMsgSize),
+                       ),
+               )
                if dialErr == nil {
                        cm.logger.Info().Str("proxy_addr", 
cm.proxyAddr).Int("attempt", attempt).Msg("Connected to FODC Proxy")
                        return connResult{conn: conn}
diff --git a/fodc/agent/internal/watchdog/watchdog.go 
b/fodc/agent/internal/watchdog/watchdog.go
index 080322b91..346a3a95d 100644
--- a/fodc/agent/internal/watchdog/watchdog.go
+++ b/fodc/agent/internal/watchdog/watchdog.go
@@ -26,6 +26,7 @@ import (
        "sync"
        "time"
 
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/fodc/agent/internal/metrics"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/panicdiag"
@@ -42,26 +43,36 @@ const (
        maxRetries     = 3
        initialBackoff = 100 * time.Millisecond
        maxBackoff     = 5 * time.Second
+       // nodeResolveGracePeriod bounds how long the watchdog defers recording 
while the local
+       // node's role is still unresolved. Deferring avoids emitting metrics 
stamped with an
+       // unresolved identity that later turns into duplicate (ghost) series 
once the role
+       // resolves; after the grace period a never-resolving node still has 
its metrics collected.
+       nodeResolveGracePeriod = 5 * time.Minute
 )
 
+// roleUnspecified is the string form of an unresolved node role.
+var roleUnspecified = 
databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)]
+
 // Watchdog periodically polls metrics from BanyanDB and forwards them to 
Flight Recorder.
 type Watchdog struct {
-       recorder MetricsRecorder
-       log      *logger.Logger
-       ctx      context.Context
-       cancel   context.CancelFunc
-       client   *http.Client
-
+       startTime      time.Time
+       ctx            context.Context
+       recorder       MetricsRecorder
+       nodeInfo       func() (role string, labels map[string]string)
+       log            *logger.Logger
+       cancel         context.CancelFunc
+       client         *http.Client
+       resolvedLabels map[string]string
        nodeRole       string
        podName        string
+       resolvedRole   string
        urls           []string
        containerNames []string
-
-       interval     time.Duration
-       retryBackoff time.Duration
-       mu           sync.RWMutex
-       wg           sync.WaitGroup
-       isRunning    bool
+       wg             sync.WaitGroup
+       interval       time.Duration
+       retryBackoff   time.Duration
+       mu             sync.RWMutex
+       isRunning      bool
 }
 
 // NewWatchdogWithConfig creates a new Watchdog instance with specified 
configuration.
@@ -74,12 +85,71 @@ func NewWatchdogWithConfig(recorder MetricsRecorder, urls 
[]string, interval tim
                ctx:            ctx,
                cancel:         cancel,
                retryBackoff:   initialBackoff,
+               startTime:      time.Now(),
                nodeRole:       nodeRole,
                podName:        podName,
                containerNames: containerNames,
        }
 }
 
+// SetNodeInfoProvider supplies a live source of the node's role and labels, 
used to stamp
+// metrics at scrape time. This avoids freezing the role/labels at a startup 
snapshot, which
+// is unreliable when the local node has not yet resolved its role when the 
agent starts.
+func (w *Watchdog) SetNodeInfoProvider(fn func() (role string, labels 
map[string]string)) {
+       w.mu.Lock()
+       w.nodeInfo = fn
+       w.mu.Unlock()
+}
+
+// resolveNodeInfo returns the current node role and labels from the live 
provider, falling back
+// to the static role captured at construction when no provider is set. The 
first resolved
+// identity is cached and "sticks": if the provider later regresses to an 
unresolved role
+// (ROLE_UNSPECIFIED), the cached resolved identity is returned instead. 
Without this, a regression
+// would cause the flight recorder to buffer a duplicate (ghost) series under 
the unresolved
+// identity, which is never evicted.
+func (w *Watchdog) resolveNodeInfo() (role string, labels map[string]string) {
+       w.mu.RLock()
+       fn := w.nodeInfo
+       cachedRole, cachedLabels := w.resolvedRole, w.resolvedLabels
+       w.mu.RUnlock()
+       if fn == nil {
+               return w.nodeRole, nil
+       }
+       role, labels = fn()
+       if role != "" && role != roleUnspecified {
+               w.mu.Lock()
+               w.resolvedRole, w.resolvedLabels = role, labels
+               w.mu.Unlock()
+               return role, labels
+       }
+       if cachedRole != "" {
+               return cachedRole, cachedLabels
+       }
+       return role, labels
+}
+
+// nodeReadyToRecord reports whether the watchdog should record metrics this 
cycle. While a live
+// node-info provider is configured but the node role has not resolved yet, 
recording is deferred
+// (up to nodeResolveGracePeriod) so the flight recorder never buffers metrics 
under an unresolved
+// identity — those would otherwise linger as duplicate (ghost) series once 
the role resolves and
+// the identity labels change. Once resolved, the gate opens permanently; if 
the role never
+// resolves, the grace period ensures the node's metrics are still recorded.
+func (w *Watchdog) nodeReadyToRecord() bool {
+       w.mu.RLock()
+       fn := w.nodeInfo
+       start := w.startTime
+       w.mu.RUnlock()
+       if fn == nil {
+               return true
+       }
+       // resolveNodeInfo caches and sticks to the first resolved identity, so 
once the role has
+       // resolved this stays true even if the live provider briefly regresses 
to unspecified.
+       if role, _ := w.resolveNodeInfo(); role != "" && role != 
roleUnspecified {
+               return true
+       }
+       return time.Since(start) >= nodeResolveGracePeriod
+}
+
 // Name returns the name of the watchdog service.
 func (w *Watchdog) Name() string {
        return "watchdog"
@@ -222,6 +292,10 @@ func (w *Watchdog) GracefulStop() {
 // It returns the enriched context so the caller can update the 
recovery-visible
 // ctx pointer, making breadcrumbs visible if a panic occurs.
 func (w *Watchdog) pollAndForward(ctx context.Context) (context.Context, 
error) {
+       if !w.nodeReadyToRecord() {
+               w.log.Debug().Msg("Deferring metrics recording until the node 
role resolves (avoids duplicate series)")
+               return ctx, nil
+       }
        ctx = panicdiag.WithBreadcrumb(ctx, "poll watchdog metrics", 
"fodc-watchdog", map[string]string{
                "endpoint_count": fmt.Sprintf("%d", len(w.urls)),
        })
@@ -328,7 +402,8 @@ func (w *Watchdog) pollMetricsFromEndpoint(ctx 
context.Context, url string, cont
                        currentBackoff = w.exponentialBackoff(currentBackoff)
                        continue
                }
-               parsedMetrics, parseErr := 
metrics.ParseWithAgentLabels(string(body), w.nodeRole, w.podName, containerName)
+               nodeRole, nodeLabels := w.resolveNodeInfo()
+               parsedMetrics, parseErr := 
metrics.ParseWithNodeLabels(string(body), nodeRole, w.podName, containerName, 
nodeLabels)
                if parseErr != nil {
                        lastErr = fmt.Errorf("failed to parse metrics: %w", 
parseErr)
                        currentBackoff = w.exponentialBackoff(currentBackoff)
diff --git a/fodc/agent/internal/watchdog/watchdog_test.go 
b/fodc/agent/internal/watchdog/watchdog_test.go
index a8ac7e807..76a746a3d 100644
--- a/fodc/agent/internal/watchdog/watchdog_test.go
+++ b/fodc/agent/internal/watchdog/watchdog_test.go
@@ -828,3 +828,55 @@ func TestWatchdog_PollMetrics_NoEndpoints(t *testing.T) {
        assert.Nil(t, rawMetrics)
        assert.Contains(t, err.Error(), "no metrics endpoints configured")
 }
+
+func TestNodeReadyToRecord(t *testing.T) {
+       // No live provider configured: always ready (preserves behavior 
without a collector).
+       noProvider := NewWatchdogWithConfig(nil, nil, time.Second, "ROLE_DATA", 
"p", []string{"data"})
+       assert.True(t, noProvider.nodeReadyToRecord())
+
+       // Provider reports an unresolved role within the grace period: defer 
recording.
+       unresolved := NewWatchdogWithConfig(nil, nil, time.Second, "", "p", 
[]string{"data"})
+       unresolved.SetNodeInfoProvider(func() (string, map[string]string) { 
return roleUnspecified, nil })
+       assert.False(t, unresolved.nodeReadyToRecord())
+
+       // Once a resolved role is seen the gate opens and stays open even if 
it regresses.
+       regress := false
+       flips := NewWatchdogWithConfig(nil, nil, time.Second, "", "p", 
[]string{"data"})
+       flips.SetNodeInfoProvider(func() (string, map[string]string) {
+               if regress {
+                       return roleUnspecified, nil
+               }
+               return "ROLE_DATA", map[string]string{"type": "cold"}
+       })
+       assert.True(t, flips.nodeReadyToRecord())
+       regress = true
+       assert.True(t, flips.nodeReadyToRecord(), "should stay ready once 
resolved")
+
+       // Unresolved but past the grace period: record anyway so metrics are 
never permanently dropped.
+       stuck := NewWatchdogWithConfig(nil, nil, time.Second, "", "p", 
[]string{"data"})
+       stuck.SetNodeInfoProvider(func() (string, map[string]string) { return 
roleUnspecified, nil })
+       stuck.startTime = time.Now().Add(-2 * nodeResolveGracePeriod)
+       assert.True(t, stuck.nodeReadyToRecord(), "should record after the 
grace period even if unresolved")
+}
+
+func TestResolveNodeInfoSticky(t *testing.T) {
+       regress := false
+       wd := NewWatchdogWithConfig(nil, nil, time.Second, "", "p", 
[]string{"data"})
+       wd.SetNodeInfoProvider(func() (string, map[string]string) {
+               if regress {
+                       return roleUnspecified, nil
+               }
+               return "ROLE_DATA", map[string]string{"type": "warm"}
+       })
+
+       role, labels := wd.resolveNodeInfo()
+       assert.Equal(t, "ROLE_DATA", role)
+       assert.Equal(t, "warm", labels["type"])
+
+       // Once resolved, a regression to an unresolved role must not be 
returned, otherwise the
+       // flight recorder buffers a duplicate ghost series under the 
unresolved identity.
+       regress = true
+       role, labels = wd.resolveNodeInfo()
+       assert.Equal(t, "ROLE_DATA", role, "must stick to the resolved role")
+       assert.Equal(t, "warm", labels["type"], "must stick to the resolved 
labels")
+}
diff --git a/fodc/proxy/cmd/proxy/main.go b/fodc/proxy/cmd/proxy/main.go
index 510360204..2c36bd145 100644
--- a/fodc/proxy/cmd/proxy/main.go
+++ b/fodc/proxy/cmd/proxy/main.go
@@ -44,7 +44,7 @@ const (
        defaultHeartbeatTimeout  = 30 * time.Second
        defaultCleanupTimeout    = 5 * time.Minute
        defaultMaxAgents         = 1000
-       defaultGRPCMaxMsgSize    = 4194304 // 4MB
+       defaultGRPCMaxMsgSize    = 32 << 20 // 32MB; a single data node's 
metrics push can exceed the gRPC 4MB default.
        defaultHTTPReadTimeout   = 10 * time.Second
        defaultHTTPWriteTimeout  = 10 * time.Second
        defaultHeartbeatInterval = 10 * time.Second
diff --git a/fodc/proxy/internal/metrics/aggregator.go 
b/fodc/proxy/internal/metrics/aggregator.go
index 986352a62..3757dec7f 100644
--- a/fodc/proxy/internal/metrics/aggregator.go
+++ b/fodc/proxy/internal/metrics/aggregator.go
@@ -20,6 +20,7 @@ package metrics
 
 import (
        "context"
+       "maps"
        "strings"
        "sync"
        "time"
@@ -35,6 +36,13 @@ const (
        // maxCollectionTimeout is the maximum timeout allowed for collecting 
metrics,
        // preventing excessively long waits for wide time windows.
        maxCollectionTimeout = 5 * time.Minute
+       // podNameLabelName and containerNameLabelName are first-class node 
identity labels that are
+       // already stamped per metric, so they are not re-applied as namespaced 
node labels.
+       podNameLabelName       = "pod_name"
+       containerNameLabelName = "container_name"
+       // nodeLabelPrefix namespaces a node's own labels (e.g. the data-node 
tier "type" becomes
+       // "node_type") so they can never collide with a metric-intrinsic label 
of the same name.
+       nodeLabelPrefix = "node_"
 )
 
 // AggregatedMetric represents an aggregated metric with node metadata.
@@ -113,13 +121,21 @@ func (ma *Aggregator) ProcessMetricsFromAgent(ctx 
context.Context, agentID strin
        aggregatedMetrics := make([]*AggregatedMetric, 0, len(req.Metrics))
 
        for _, metric := range req.Metrics {
-               labels := make(map[string]string)
-               for key, value := range metric.Labels {
-                       labels[key] = value
-               }
+               labels := make(map[string]string, len(metric.Labels))
+               maps.Copy(labels, metric.Labels)
 
+               // Overlay the agent's node labels under a "node_" prefix so 
they can never collide
+               // with a metric-intrinsic label of the same name (e.g. the 
merge "type"). pod_name and
+               // container_name are already first-class labels and are 
skipped. A namespaced label
+               // already present (stamped per metric by the agent) is left 
untouched.
                for key, value := range agentInfo.Labels {
-                       labels[key] = value
+                       if value == "" || key == podNameLabelName || key == 
containerNameLabelName {
+                               continue
+                       }
+                       prefixed := nodeLabelPrefix + key
+                       if _, exists := labels[prefixed]; !exists {
+                               labels[prefixed] = value
+                       }
                }
 
                var timestamp time.Time
diff --git a/fodc/proxy/internal/metrics/aggregator_test.go 
b/fodc/proxy/internal/metrics/aggregator_test.go
index 678b07156..a7fb6a637 100644
--- a/fodc/proxy/internal/metrics/aggregator_test.go
+++ b/fodc/proxy/internal/metrics/aggregator_test.go
@@ -251,8 +251,11 @@ func TestProcessMetricsFromAgent_WithAgentLabels(t 
*testing.T) {
        case metrics := <-collectCh:
                require.Equal(t, 1, len(metrics))
                metric := metrics[0]
-               assert.Equal(t, "prod", metric.Labels["env"])
-               assert.Equal(t, "us-east", metric.Labels["zone"])
+               // Custom node labels are exposed under the node_ prefix.
+               assert.Equal(t, "prod", metric.Labels["node_env"])
+               assert.Equal(t, "us-east", metric.Labels["node_zone"])
+               // The metric's own intrinsic labels and first-class identity 
labels are untouched.
+               assert.Equal(t, "heap", metric.Labels["type"])
                assert.Equal(t, "test", metric.Labels["pod_name"])
                assert.Equal(t, "master", metric.Labels["container_name"])
        case <-time.After(1 * time.Second):
@@ -818,3 +821,65 @@ func TestMatchesAddress_PortMismatch(t *testing.T) {
 
        assert.False(t, result)
 }
+
+func collectOneMetric(t *testing.T, aggregator *Aggregator, agentID string, 
agentInfo *registry.AgentInfo,
+       req *fodcv1.StreamMetricsRequest,
+) *AggregatedMetric {
+       t.Helper()
+       aggregator.collectingMu.Lock()
+       collectCh := make(chan []*AggregatedMetric, 1)
+       aggregator.collecting[agentID] = collectCh
+       aggregator.collectingMu.Unlock()
+
+       require.NoError(t, 
aggregator.ProcessMetricsFromAgent(context.Background(), agentID, agentInfo, 
req))
+
+       select {
+       case got := <-collectCh:
+               require.Len(t, got, 1)
+               return got[0]
+       case <-time.After(time.Second):
+               t.Fatal("timeout waiting for metrics")
+               return nil
+       }
+}
+
+func TestProcessMetricsFromAgent_NamespacesNodeLabels(t *testing.T) {
+       aggregator, testRegistry, _ := newTestAggregator(t)
+       // The agent reports node labels: the tier ("type") plus pod_name 
(which is already a
+       // first-class label and must not be re-applied as node_pod_name).
+       agentID := createTestAgent(t, testRegistry, 
"demo-banyandb-data-warm-1", "datanode-warm",
+               map[string]string{"type": "warm", "pod_name": 
"demo-banyandb-data-warm-1"})
+       agentInfo, getErr := testRegistry.GetAgentByID(agentID)
+       require.NoError(t, getErr)
+
+       req := 
createTestStreamMetricsRequest("banyandb_queue_sub_total_msg_received", 5, 
map[string]string{
+               "pod_name": "demo-banyandb-data-warm-1",
+               "topic":    "v1:stream-query",
+       }, nil)
+
+       metric := collectOneMetric(t, aggregator, agentID, agentInfo, req)
+       assert.Equal(t, "warm", metric.Labels["node_type"], "tier exposed as 
node_type")
+       _, hasRawType := metric.Labels["type"]
+       assert.False(t, hasRawType, "node label must not appear under the raw 
key")
+       _, hasNodePodName := metric.Labels["node_pod_name"]
+       assert.False(t, hasNodePodName, "pod_name must not be namespaced")
+}
+
+func TestProcessMetricsFromAgent_NodeLabelDoesNotClobberMetricLabel(t 
*testing.T) {
+       aggregator, testRegistry, _ := newTestAggregator(t)
+       // The agent reports a node "type" label (the data-node storage tier).
+       agentID := createTestAgent(t, testRegistry, "demo-banyandb-data-hot-0", 
"datanode-hot", map[string]string{"type": "hot"})
+       agentInfo, getErr := testRegistry.GetAgentByID(agentID)
+       require.NoError(t, getErr)
+
+       // The metric carries its own intrinsic "type" label (e.g. a merge part 
type).
+       req := 
createTestStreamMetricsRequest("banyandb_measure_total_merged_parts", 3, 
map[string]string{
+               "pod_name": "demo-banyandb-data-hot-0",
+               "type":     "file",
+       }, nil)
+
+       metric := collectOneMetric(t, aggregator, agentID, agentInfo, req)
+       // The metric's intrinsic type is preserved; the node tier lands on 
node_type, no collision.
+       assert.Equal(t, "file", metric.Labels["type"])
+       assert.Equal(t, "hot", metric.Labels["node_type"])
+}

Reply via email to