This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 45e667437 Thread Prometheus metric type end-to-end through FODC agent
and proxy (#1153)
45e667437 is described below
commit 45e667437a4b3a86bc3f01f22867a71e9e97bf53
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jun 4 13:55:56 2026 +0800
Thread Prometheus metric type end-to-end through FODC agent and proxy
(#1153)
---
CHANGES.md | 1 +
api/proto/banyandb/fodc/v1/rpc.proto | 11 +
docs/api-reference.md | 19 ++
fodc/agent/internal/exporter/exporter.go | 19 +-
.../internal/exporter/prometheus_format_test.go | 3 +-
fodc/agent/internal/flightrecorder/datasource.go | 20 ++
fodc/agent/internal/metrics/parse.go | 57 ++++++
fodc/agent/internal/metrics/parse_test.go | 45 +++++
fodc/agent/internal/proxy/client.go | 27 ++-
fodc/agent/internal/proxy/client_test.go | 9 +-
fodc/agent/testhelper/flightrecorder.go | 2 +
fodc/proxy/internal/api/server.go | 221 +++++++++++++++------
fodc/proxy/internal/api/server_test.go | 164 +++++++++++++++
fodc/proxy/internal/metrics/aggregator.go | 21 ++
go.mod | 2 +-
15 files changed, 553 insertions(+), 68 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 2170765a8..19ab8f97d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -104,6 +104,7 @@ Release Notes.
- Fix trace query identity-tag projection: when `trace_id`/`span_id` are
explicitly projected, reconstruct them from span identity at response build
time instead of requesting them as stored tags, and preserve tag order with
null-filled per-span value alignment in the distributed trace result iterator.
- Fix measure, stream, and trace queries returning data from segments already
expired by the TTL. Retention removes a segment only on its next scheduled run,
so a fully expired segment can linger on disk and keep serving TTL-expired
data; queries now skip segments whose whole time range is past the retention
deadline, matching retention's own removal condition.
- Fix flaky measure snapshot tests that gated on the part directory appearing
in `tab/` as the flush-completion signal. That directory is created by the
first line of `memPart.mustFlush`, before the mem→file introduction reaches the
in-memory snapshot and before the `.snp` manifest is persisted, so under
`-race`/CI load `TakeFileSnapshot` could observe only mem parts and `Close`
could drop the in-flight flush; gate on the persisted `.snp` manifest instead.
+- Fix FODC proxy corrupting Prometheus metric types. The agent dropped the `#
TYPE` line while parsing banyandb `/metrics`, the `StreamMetrics` proto carried
no type field, and the proxy guessed the type from a name-suffix heuristic —
downgrading counters to gauge, mislabeling `_count`-suffixed counters as
histograms, and splitting summaries into two conflicting `# TYPE` lines.
Capture the type with the Prometheus `expfmt` parser, store it in the flight
recorder, thread it through a new [...]
### Chores
diff --git a/api/proto/banyandb/fodc/v1/rpc.proto
b/api/proto/banyandb/fodc/v1/rpc.proto
index a71f3d0b7..825f5fddc 100644
--- a/api/proto/banyandb/fodc/v1/rpc.proto
+++ b/api/proto/banyandb/fodc/v1/rpc.proto
@@ -64,12 +64,23 @@ message StreamMetricsRequest {
google.protobuf.Timestamp timestamp = 2;
}
+// MetricType represents the Prometheus metric type.
+enum MetricType {
+ METRIC_TYPE_UNSPECIFIED = 0;
+ METRIC_TYPE_GAUGE = 1;
+ METRIC_TYPE_COUNTER = 2;
+ METRIC_TYPE_HISTOGRAM = 3;
+ METRIC_TYPE_SUMMARY = 4;
+ METRIC_TYPE_UNTYPED = 5;
+}
+
message Metric {
string name = 1;
map<string, string> labels = 2;
double value = 3;
string description = 4;
google.protobuf.Timestamp timestamp = 5;
+ MetricType type = 6;
}
message StreamMetricsResponse {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index ce040f95f..0090d8438 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -359,6 +359,8 @@
- [StreamMetricsResponse](#banyandb-fodc-v1-StreamMetricsResponse)
- [Topology](#banyandb-fodc-v1-Topology)
+ - [MetricType](#banyandb-fodc-v1-MetricType)
+
- [FODCService](#banyandb-fodc-v1-FODCService)
- [GroupLifecycleService](#banyandb-fodc-v1-GroupLifecycleService)
@@ -5441,6 +5443,7 @@ Phase represents the current phase of the deletion task.
| value | [double](#double) | | |
| description | [string](#string) | | |
| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | |
+| type | [MetricType](#banyandb-fodc-v1-MetricType) | | |
@@ -5661,6 +5664,22 @@ Phase represents the current phase of the deletion task.
+
+<a name="banyandb-fodc-v1-MetricType"></a>
+
+### MetricType
+MetricType represents the Prometheus metric type.
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| METRIC_TYPE_UNSPECIFIED | 0 | |
+| METRIC_TYPE_GAUGE | 1 | |
+| METRIC_TYPE_COUNTER | 2 | |
+| METRIC_TYPE_HISTOGRAM | 3 | |
+| METRIC_TYPE_SUMMARY | 4 | |
+| METRIC_TYPE_UNTYPED | 5 | |
+
+
diff --git a/fodc/agent/internal/exporter/exporter.go
b/fodc/agent/internal/exporter/exporter.go
index 20569a19c..c50716a3e 100644
--- a/fodc/agent/internal/exporter/exporter.go
+++ b/fodc/agent/internal/exporter/exporter.go
@@ -87,6 +87,21 @@ func (dc *DatasourceCollector) Describe(ch chan<-
*prometheus.Desc) {
}
}
+// chooseValueType maps a lowercase prometheus type string to a
prometheus.ValueType
+// suitable for NewConstMetric. Histogram and summary cannot be represented as
scalar
+// const metrics, so they degrade to UntypedValue.
+func chooseValueType(t string) prometheus.ValueType {
+ switch t {
+ case "counter":
+ return prometheus.CounterValue
+ case "histogram", "summary", "untyped":
+ return prometheus.UntypedValue
+ default:
+ // gauge or "" (unknown) → gauge
+ return prometheus.GaugeValue
+ }
+}
+
// Collect implements prometheus.Collector interface.
// It collects metrics from all Datasources and sends them to the channel.
func (dc *DatasourceCollector) Collect(ch chan<- prometheus.Metric) {
@@ -94,6 +109,7 @@ func (dc *DatasourceCollector) Collect(ch chan<-
prometheus.Metric) {
for _, ds := range datasources {
metricsMap := ds.GetMetrics()
descriptions := ds.GetDescriptions()
+ types := ds.GetTypes()
timestamps := ds.GetTimestamps()
for metricKeyStr, metricBuffer := range metricsMap {
@@ -128,9 +144,10 @@ func (dc *DatasourceCollector) Collect(ch chan<-
prometheus.Metric) {
labelValues[idx] = labelValueMap[labelName]
}
+ metricType := metrics.ResolveMetricType(types,
parsedKey.name)
metric, createErr := prometheus.NewConstMetric(
desc,
- prometheus.GaugeValue,
+ chooseValueType(metricType),
currentValue,
labelValues...,
)
diff --git a/fodc/agent/internal/exporter/prometheus_format_test.go
b/fodc/agent/internal/exporter/prometheus_format_test.go
index ea854a571..25665d5d2 100644
--- a/fodc/agent/internal/exporter/prometheus_format_test.go
+++ b/fodc/agent/internal/exporter/prometheus_format_test.go
@@ -44,6 +44,7 @@ func TestPrometheusFormatConversion(t *testing.T) {
Value: 100.0,
Labels: []metrics.Label{{Name: "method", Value: "GET"},
{Name: "status", Value: "200"}},
Desc: "Total number of HTTP requests",
+ Type: "counter",
},
{
Name: "cpu_usage",
@@ -126,7 +127,7 @@ func TestPrometheusFormatConversion(t *testing.T) {
// Check TYPE lines
if strings.HasPrefix(line, "# TYPE") {
- if strings.Contains(line, "http_requests_total") &&
strings.Contains(line, "gauge") {
+ if strings.Contains(line, "http_requests_total") &&
strings.Contains(line, "counter") {
hasTypeHTTPRequests = true
}
if strings.Contains(line, "cpu_usage") &&
strings.Contains(line, "gauge") {
diff --git a/fodc/agent/internal/flightrecorder/datasource.go
b/fodc/agent/internal/flightrecorder/datasource.go
index e8afff56c..747b467b0 100644
--- a/fodc/agent/internal/flightrecorder/datasource.go
+++ b/fodc/agent/internal/flightrecorder/datasource.go
@@ -67,6 +67,7 @@ func UpdateTimestampRingBuffer(trb *TimestampRingBuffer, v
int64) {
type Datasource struct {
metrics map[string]*MetricRingBuffer // Map from metric
name+labels to RingBuffer storing metric values
descriptions map[string]string // Map from metric name to
HELP content descriptions
+ types map[string]string // Map from metric name to
Prometheus type string
timestamps *TimestampRingBuffer // RingBuffer storing
timestamps for each polling cycle
mu sync.RWMutex
CapacitySize int64 // Memory limit in bytes
@@ -78,6 +79,7 @@ func NewDatasource() *Datasource {
metrics: make(map[string]*MetricRingBuffer),
timestamps: NewTimestampRingBuffer(),
descriptions: make(map[string]string),
+ types: make(map[string]string),
CapacitySize: 0,
}
}
@@ -103,6 +105,9 @@ func (ds *Datasource) Update(m *metrics.RawMetric) error {
if m.Desc != "" {
ds.descriptions[m.Name] = m.Desc
}
+ if m.Type != "" {
+ ds.types[m.Name] = m.Type
+ }
UpdateMetricRingBuffer(ds.metrics[metricKey], m.Value)
return nil
@@ -140,6 +145,9 @@ func (ds *Datasource) UpdateBatch(rawMetrics
[]metrics.RawMetric, timestamp int6
if m.Desc != "" {
ds.descriptions[m.Name] = m.Desc
}
+ if m.Type != "" {
+ ds.types[m.Name] = m.Type
+ }
buffer := ds.metrics[metricKey]
UpdateMetricRingBuffer(buffer, m.Value)
updatedBuffers = append(updatedBuffers, buffer)
@@ -259,3 +267,15 @@ func (ds *Datasource) GetDescriptions() map[string]string {
}
return result
}
+
+// GetTypes returns a copy of the types map.
+func (ds *Datasource) GetTypes() map[string]string {
+ ds.mu.RLock()
+ defer ds.mu.RUnlock()
+
+ result := make(map[string]string)
+ for k, v := range ds.types {
+ result[k] = v
+ }
+ return result
+}
diff --git a/fodc/agent/internal/metrics/parse.go
b/fodc/agent/internal/metrics/parse.go
index 5a4375142..974e960d5 100644
--- a/fodc/agent/internal/metrics/parse.go
+++ b/fodc/agent/internal/metrics/parse.go
@@ -24,6 +24,10 @@ import (
"sort"
"strconv"
"strings"
+
+ dto "github.com/prometheus/client_model/go"
+ "github.com/prometheus/common/expfmt"
+ "github.com/prometheus/common/model"
)
// Label represents a metric label as a key-value pair.
@@ -36,6 +40,7 @@ type Label struct {
type RawMetric struct {
Name string
Desc string
+ Type string // lowercase prometheus type:
"counter","gauge","histogram","summary","untyped",""
Labels []Label
Value float64
}
@@ -76,8 +81,59 @@ var (
labelRegex = regexp.MustCompile(`(\w+)="([^"]+)"`)
)
+// metricTypeToString converts a dto.MetricType to its lowercase Prometheus
text representation.
+func metricTypeToString(mt dto.MetricType) string {
+ switch mt {
+ case dto.MetricType_COUNTER:
+ return "counter"
+ case dto.MetricType_GAUGE:
+ return "gauge"
+ case dto.MetricType_SUMMARY:
+ return "summary"
+ case dto.MetricType_HISTOGRAM:
+ return "histogram"
+ case dto.MetricType_UNTYPED:
+ return "untyped"
+ default:
+ return ""
+ }
+}
+
+// ResolveMetricType resolves the metric type for a given series name using
the typeMap.
+// For histogram/summary component series (ending in _bucket/_sum/_count), the
base family
+// name is used to look up the type. Returns "" if unknown.
+func ResolveMetricType(typeMap map[string]string, name string) string {
+ if t, ok := typeMap[name]; ok {
+ return t
+ }
+ // Trim histogram/summary suffixes to find the base family name.
+ for _, suffix := range []string{"_bucket", "_sum", "_count"} {
+ if strings.HasSuffix(name, suffix) {
+ base := strings.TrimSuffix(name, suffix)
+ if base != "" {
+ if t, ok := typeMap[base]; ok {
+ return t
+ }
+ }
+ break
+ }
+ }
+ return ""
+}
+
// ParseWithAgentLabels parses Prometheus text format metrics and returns
structured RawMetric objects.
func ParseWithAgentLabels(text string, nodeRole, podName, containerName
string) ([]RawMetric, error) {
+ // Build authoritative type map using expfmt.
+ // TextToMetricFamilies may return a non-nil error AND a non-empty
families map
+ // simultaneously (e.g. for "unexpected end of input stream" on
trailing newline).
+ // We always use whatever families were successfully parsed, ignoring
parse errors.
+ typeMap := make(map[string]string)
+ parser := expfmt.NewTextParser(model.LegacyValidation)
+ families, _ := parser.TextToMetricFamilies(strings.NewReader(text))
+ for familyName, mf := range families {
+ typeMap[familyName] = metricTypeToString(mf.GetType())
+ }
+
lines := strings.Split(text, "\n")
var metrics []RawMetric
helpMap := make(map[string]string)
@@ -145,6 +201,7 @@ func ParseWithAgentLabels(text string, nodeRole, podName,
containerName string)
Labels: labels,
Value: value,
Desc: desc,
+ Type: ResolveMetricType(typeMap, metricName),
})
}
diff --git a/fodc/agent/internal/metrics/parse_test.go
b/fodc/agent/internal/metrics/parse_test.go
index fab8c5fdf..efb4adff4 100644
--- a/fodc/agent/internal/metrics/parse_test.go
+++ b/fodc/agent/internal/metrics/parse_test.go
@@ -1076,6 +1076,51 @@ func TestParse_LabelValueWithNumbers(t *testing.T) {
assert.Equal(t, "value123", metrics[0].Labels[0].Value)
}
+// TestParse_TypePropagation verifies that expfmt-derived types are correctly
set on
+// RawMetric.Type, including base-name resolution for histogram component
series.
+func TestParse_TypePropagation(t *testing.T) {
+ text := `# HELP http_requests_total Total HTTP requests
+# TYPE http_requests_total counter
+http_requests_total{method="GET"} 1234
+# HELP request_latency Request latency histogram
+# TYPE request_latency histogram
+request_latency_bucket{le="0.1"} 10
+request_latency_bucket{le="0.5"} 25
+request_latency_bucket{le="+Inf"} 30
+request_latency_sum 12.5
+request_latency_count 30
+# HELP cpu_usage CPU usage
+# TYPE cpu_usage gauge
+cpu_usage 75.5`
+
+ result, parseErr := ParseWithAgentLabels(text, "", "", "")
+
+ require.NoError(t, parseErr)
+
+ // Build a name→type map for easy assertion.
+ typeByName := make(map[string][]string)
+ for _, m := range result {
+ typeByName[m.Name] = append(typeByName[m.Name], m.Type)
+ }
+
+ // Counter: all series for http_requests_total must be "counter".
+ for _, typ := range typeByName["http_requests_total"] {
+ assert.Equal(t, "counter", typ, "http_requests_total should be
counter")
+ }
+
+ // Gauge.
+ for _, typ := range typeByName["cpu_usage"] {
+ assert.Equal(t, "gauge", typ, "cpu_usage should be gauge")
+ }
+
+ // Histogram component series must all resolve to "histogram".
+ for _, name := range []string{"request_latency_bucket",
"request_latency_sum", "request_latency_count"} {
+ for _, typ := range typeByName[name] {
+ assert.Equal(t, "histogram", typ, "%s should resolve to
histogram via base name", name)
+ }
+ }
+}
+
func TestParse_WithAgentIdentityLabels(t *testing.T) {
text := `cpu_usage{instance="localhost"} 75.5
http_requests_total{method="GET"} 100`
diff --git a/fodc/agent/internal/proxy/client.go
b/fodc/agent/internal/proxy/client.go
index 3f954e6e9..b18305caa 100644
--- a/fodc/agent/internal/proxy/client.go
+++ b/fodc/agent/internal/proxy/client.go
@@ -712,6 +712,7 @@ func (c *Client) RetrieveAndSendMetrics(_ context.Context,
filter *MetricsReques
allMetrics := ds.GetMetrics()
timestamps := ds.GetTimestamps()
descriptions := ds.GetDescriptions()
+ types := ds.GetTypes()
if filter != nil && (filter.StartTime != nil || filter.EndTime != nil) {
if timestamps == nil {
@@ -756,7 +757,7 @@ func (c *Client) RetrieveAndSendMetrics(_ context.Context,
filter *MetricsReques
c.streamsMu.Unlock()
return fmt.Errorf("metrics stream changed during send")
}
- sendErr := c.sendFilteredMetrics(metricsStream, allMetrics,
timestampValues, descriptions, filter)
+ sendErr := c.sendFilteredMetrics(metricsStream, allMetrics,
timestampValues, descriptions, types, filter)
c.streamsMu.Unlock()
return sendErr
}
@@ -766,16 +767,35 @@ func (c *Client) RetrieveAndSendMetrics(_
context.Context, filter *MetricsReques
c.streamsMu.Unlock()
return fmt.Errorf("metrics stream changed during send")
}
- sendErr := c.sendLatestMetrics(metricsStream, allMetrics, descriptions)
+ sendErr := c.sendLatestMetrics(metricsStream, allMetrics, descriptions,
types)
c.streamsMu.Unlock()
return sendErr
}
+// toProtoMetricType converts a lowercase prometheus type string to the proto
enum.
+func toProtoMetricType(t string) fodcv1.MetricType {
+ switch t {
+ case "counter":
+ return fodcv1.MetricType_METRIC_TYPE_COUNTER
+ case "gauge":
+ return fodcv1.MetricType_METRIC_TYPE_GAUGE
+ case "histogram":
+ return fodcv1.MetricType_METRIC_TYPE_HISTOGRAM
+ case "summary":
+ return fodcv1.MetricType_METRIC_TYPE_SUMMARY
+ case "untyped":
+ return fodcv1.MetricType_METRIC_TYPE_UNTYPED
+ default:
+ return fodcv1.MetricType_METRIC_TYPE_UNSPECIFIED
+ }
+}
+
// sendLatestMetrics sends the latest metrics (most recent values).
func (c *Client) sendLatestMetrics(
stream fodcv1.FODCService_StreamMetricsClient,
allMetrics map[string]*flightrecorder.MetricRingBuffer,
descriptions map[string]string,
+ types map[string]string,
) error {
protoMetrics := make([]*fodcv1.Metric, 0)
@@ -803,6 +823,7 @@ func (c *Client) sendLatestMetrics(
Labels: labelsMap,
Value: metricValue,
Description: descriptions[parsedKey.Name],
+ Type:
toProtoMetricType(metrics.ResolveMetricType(types, parsedKey.Name)),
}
protoMetrics = append(protoMetrics, protoMetric)
@@ -826,6 +847,7 @@ func (c *Client) sendFilteredMetrics(
allMetrics map[string]*flightrecorder.MetricRingBuffer,
timestampValues []int64,
descriptions map[string]string,
+ types map[string]string,
filter *MetricsRequestFilter,
) error {
protoMetrics := make([]*fodcv1.Metric, 0)
@@ -871,6 +893,7 @@ func (c *Client) sendFilteredMetrics(
Value: metricValues[idx],
Description: description,
Timestamp: timestamppb.New(timestamp),
+ Type:
toProtoMetricType(metrics.ResolveMetricType(types, parsedKey.Name)),
}
protoMetrics = append(protoMetrics, protoMetric)
diff --git a/fodc/agent/internal/proxy/client_test.go
b/fodc/agent/internal/proxy/client_test.go
index 01cb7cdd9..b88e77726 100644
--- a/fodc/agent/internal/proxy/client_test.go
+++ b/fodc/agent/internal/proxy/client_test.go
@@ -1333,11 +1333,12 @@ func
TestProxyClient_sendLatestMetrics_WithUnfinalizedValue(t *testing.T) {
allMetrics := ds.GetMetrics()
descriptions := ds.GetDescriptions()
+ types := ds.GetTypes()
ctx := context.Background()
mockStream := newMockStreamMetricsClient(ctx)
- err := pc.sendLatestMetrics(mockStream, allMetrics, descriptions)
+ err := pc.sendLatestMetrics(mockStream, allMetrics, descriptions, types)
require.NoError(t, err)
mockStream.mu.Lock()
@@ -1376,6 +1377,7 @@ func TestProxyClient_sendFilteredMetrics_TimeWindow(t
*testing.T) {
timestamps := ds.GetTimestamps()
timestampValues := timestamps.GetAllValues()
descriptions := ds.GetDescriptions()
+ types := ds.GetTypes()
startTime := now.Add(-90 * time.Minute)
endTime := now.Add(-30 * time.Minute)
@@ -1387,7 +1389,7 @@ func TestProxyClient_sendFilteredMetrics_TimeWindow(t
*testing.T) {
ctx := context.Background()
mockStream := newMockStreamMetricsClient(ctx)
- err := pc.sendFilteredMetrics(mockStream, allMetrics, timestampValues,
descriptions, filter)
+ err := pc.sendFilteredMetrics(mockStream, allMetrics, timestampValues,
descriptions, types, filter)
require.NoError(t, err)
mockStream.mu.Lock()
@@ -1419,6 +1421,7 @@ func
TestProxyClient_sendFilteredMetrics_NoMatchingTimeWindow(t *testing.T) {
timestamps := ds.GetTimestamps()
timestampValues := timestamps.GetAllValues()
descriptions := ds.GetDescriptions()
+ types := ds.GetTypes()
// Filter for future time window (no matches)
startTime := now.Add(1 * time.Hour)
@@ -1431,7 +1434,7 @@ func
TestProxyClient_sendFilteredMetrics_NoMatchingTimeWindow(t *testing.T) {
ctx := context.Background()
mockStream := newMockStreamMetricsClient(ctx)
- err := pc.sendFilteredMetrics(mockStream, allMetrics, timestampValues,
descriptions, filter)
+ err := pc.sendFilteredMetrics(mockStream, allMetrics, timestampValues,
descriptions, types, filter)
require.NoError(t, err)
mockStream.mu.Lock()
diff --git a/fodc/agent/testhelper/flightrecorder.go
b/fodc/agent/testhelper/flightrecorder.go
index fc2e0b89d..e06170cd7 100644
--- a/fodc/agent/testhelper/flightrecorder.go
+++ b/fodc/agent/testhelper/flightrecorder.go
@@ -43,6 +43,7 @@ func NewFlightRecorder(capacitySize int64)
*flightrecorder.FlightRecorder {
type RawMetric struct {
Desc string
Name string
+ Type string
Labels []Label
Value float64
}
@@ -74,6 +75,7 @@ func UpdateMetrics(fr interface{}, rawMetrics []RawMetric)
error {
Name: rawMetrics[idx].Name,
Value: rawMetrics[idx].Value,
Desc: rawMetrics[idx].Desc,
+ Type: rawMetrics[idx].Type,
Labels: internalLabels,
}
}
diff --git a/fodc/proxy/internal/api/server.go
b/fodc/proxy/internal/api/server.go
index 203eb9cff..a8b864be6 100644
--- a/fodc/proxy/internal/api/server.go
+++ b/fodc/proxy/internal/api/server.go
@@ -270,82 +270,137 @@ func (s *Server) handleHealth(w http.ResponseWriter, r
*http.Request) {
}
// formatPrometheusText formats aggregated metrics as Prometheus text format.
+//
+// Metrics with a known Type (non-empty) are emitted using the real type.
Metrics with
+// Type=="" (pre-upgrade agents) fall back to the legacy suffix-heuristic path
so behavior
+// is unchanged for those metrics.
func (s *Server) formatPrometheusText(aggregatedMetrics
[]*metrics.AggregatedMetric) string {
if len(aggregatedMetrics) == 0 {
return ""
}
- metricMap := make(map[string]*metricGroup)
- histogramBases := make(map[string]bool)
+ // Partition into typed (type known from agent) and untyped
(legacy/unknown).
+ var typed []*metrics.AggregatedMetric
+ var untyped []*metrics.AggregatedMetric
+ for _, m := range aggregatedMetrics {
+ if m.Type != "" {
+ typed = append(typed, m)
+ } else {
+ untyped = append(untyped, m)
+ }
+ }
- for _, metric := range aggregatedMetrics {
- key := metric.Name
- group, exists := metricMap[key]
+ var builder strings.Builder
+ remainingUntyped := writeTypedFamilies(&builder, typed, untyped)
+ writeUntypedFamilies(&builder, remainingUntyped)
+ return builder.String()
+}
+
+// writeTypedFamilies groups typed metrics by family base and emits one "#
TYPE" line per
+// family using the real type. The family base for histogram/summary component
series
+// (_bucket/_sum/_count) is the name with the suffix stripped; bare summary
quantile series
+// and all counter/gauge/untyped series use the name as-is. Untyped metrics
whose base
+// collides with a typed family are absorbed under the authoritative typed
line — this
+// prevents a mixed-version rollout from emitting two conflicting "# TYPE"
lines for one
+// name. The untyped metrics with no typed-family collision are returned for
the legacy path.
+func writeTypedFamilies(builder *strings.Builder, typed, untyped
[]*metrics.AggregatedMetric) []*metrics.AggregatedMetric {
+ typedFamilyOrder := make([]string, 0)
+ typedFamilies := make(map[string]*metricGroup) // base → group
+ for _, m := range typed {
+ base := typedFamilyBase(m.Name, m.Type)
+ grp, exists := typedFamilies[base]
if !exists {
- group = &metricGroup{
- name: metric.Name,
- description: metric.Description,
+ grp = &metricGroup{
+ name: base,
+ description: m.Description,
+ metricType: m.Type,
metrics: make([]*metrics.AggregatedMetric,
0),
}
- metricMap[key] = group
+ typedFamilies[base] = grp
+ typedFamilyOrder = append(typedFamilyOrder, base)
}
- group.metrics = append(group.metrics, metric)
-
- if strings.HasSuffix(metric.Name, "_bucket") ||
- strings.HasSuffix(metric.Name, "_sum") ||
- strings.HasSuffix(metric.Name, "_count") {
- baseName := getHistogramBaseName(metric.Name)
- if baseName != "" {
- histogramBases[baseName] = true
+ grp.metrics = append(grp.metrics, m)
+ }
+
+ remainingUntyped := make([]*metrics.AggregatedMetric, 0, len(untyped))
+ for _, m := range untyped {
+ grp := matchTypedFamily(typedFamilies, m.Name)
+ if grp == nil {
+ remainingUntyped = append(remainingUntyped, m)
+ continue
+ }
+ if grp.description == "" && m.Description != "" {
+ grp.description = m.Description
+ }
+ grp.metrics = append(grp.metrics, m)
+ }
+
+ sort.Strings(typedFamilyOrder)
+ for _, base := range typedFamilyOrder {
+ grp := typedFamilies[base]
+ if grp.description != "" {
+ builder.WriteString(fmt.Sprintf("# HELP %s %s\n", base,
grp.description))
+ }
+ builder.WriteString(fmt.Sprintf("# TYPE %s %s\n", base,
grp.metricType))
+ for _, m := range grp.metrics {
+ builder.WriteString(formatMetricLine(m))
+ }
+ }
+ return remainingUntyped
+}
+
+// writeUntypedFamilies emits metrics with no known type using the legacy
suffix heuristic:
+// a base name with a _bucket/_sum/_count sibling is treated as a histogram,
everything else
+// as a gauge. This preserves pre-typed behavior for pre-upgrade agents.
+func writeUntypedFamilies(builder *strings.Builder, untyped
[]*metrics.AggregatedMetric) {
+ if len(untyped) == 0 {
+ return
+ }
+ metricMap := make(map[string]*metricGroup)
+ histogramBases := make(map[string]bool)
+ for _, m := range untyped {
+ grp, exists := metricMap[m.Name]
+ if !exists {
+ grp = &metricGroup{
+ name: m.Name,
+ description: m.Description,
+ metrics: make([]*metrics.AggregatedMetric,
0),
}
+ metricMap[m.Name] = grp
+ }
+ grp.metrics = append(grp.metrics, m)
+ if baseName := getHistogramBaseName(m.Name); baseName != "" {
+ histogramBases[baseName] = true
}
}
- histogramMetrics := make(map[string][]*metrics.AggregatedMetric)
+ histogramMetricsMap := make(map[string][]*metrics.AggregatedMetric)
regularMetrics := make(map[string]*metricGroup)
-
- for name, group := range metricMap {
+ for name, grp := range metricMap {
baseName := getHistogramBaseName(name)
if baseName != "" && histogramBases[baseName] {
- histogramMetrics[baseName] =
append(histogramMetrics[baseName], group.metrics...)
+ histogramMetricsMap[baseName] =
append(histogramMetricsMap[baseName], grp.metrics...)
} else {
- regularMetrics[name] = group
+ regularMetrics[name] = grp
}
}
- var builder strings.Builder
-
histogramNames := make([]string, 0, len(histogramBases))
for baseName := range histogramBases {
histogramNames = append(histogramNames, baseName)
}
sort.Strings(histogramNames)
-
for _, baseName := range histogramNames {
- allMetrics := histogramMetrics[baseName]
- if len(allMetrics) == 0 {
+ allM := histogramMetricsMap[baseName]
+ if len(allM) == 0 {
continue
}
-
- description := allMetrics[0].Description
- if description != "" {
+ if description := allM[0].Description; description != "" {
builder.WriteString(fmt.Sprintf("# HELP %s %s\n",
baseName, description))
}
builder.WriteString(fmt.Sprintf("# TYPE %s histogram\n",
baseName))
-
- for _, metric := range allMetrics {
- labelParts := make([]string, 0, len(metric.Labels))
- for key, value := range metric.Labels {
- labelParts = append(labelParts,
fmt.Sprintf(`%s="%s"`, key, value))
- }
- sort.Strings(labelParts)
-
- labelStr := ""
- if len(labelParts) > 0 {
- labelStr = "{" + strings.Join(labelParts, ",")
+ "}"
- }
-
- builder.WriteString(fmt.Sprintf("%s%s %s\n",
metric.Name, labelStr, formatFloat(metric.Value)))
+ for _, m := range allM {
+ builder.WriteString(formatMetricLine(m))
}
}
@@ -354,31 +409,76 @@ func (s *Server) formatPrometheusText(aggregatedMetrics
[]*metrics.AggregatedMet
regularNames = append(regularNames, name)
}
sort.Strings(regularNames)
-
for _, name := range regularNames {
- group := regularMetrics[name]
- if group.description != "" {
- builder.WriteString(fmt.Sprintf("# HELP %s %s\n",
group.name, group.description))
+ grp := regularMetrics[name]
+ if grp.description != "" {
+ builder.WriteString(fmt.Sprintf("# HELP %s %s\n",
grp.name, grp.description))
+ }
+ builder.WriteString(fmt.Sprintf("# TYPE %s gauge\n", grp.name))
+ for _, m := range grp.metrics {
+ builder.WriteString(formatMetricLine(m))
}
- builder.WriteString(fmt.Sprintf("# TYPE %s gauge\n",
group.name))
+ }
+}
- for _, metric := range group.metrics {
- labelParts := make([]string, 0, len(metric.Labels))
- for key, value := range metric.Labels {
- labelParts = append(labelParts,
fmt.Sprintf(`%s="%s"`, key, value))
+// matchTypedFamily returns the typed family an untyped series belongs to, or
nil if none.
+// It checks the exact name first (counter/gauge/untyped series and bare
summary quantile
+// series), then the histogram/summary component base after trimming a trailing
+// _bucket/_sum/_count suffix. Used to fold pre-upgrade (untyped) samples into
the
+// authoritative typed family so the proxy never emits two TYPE lines for one
name.
+func matchTypedFamily(typedFamilies map[string]*metricGroup, name string)
*metricGroup {
+ if grp, ok := typedFamilies[name]; ok {
+ return grp
+ }
+ for _, suffix := range []string{"_bucket", "_sum", "_count"} {
+ if strings.HasSuffix(name, suffix) {
+ base := strings.TrimSuffix(name, suffix)
+ if base != "" {
+ if grp, ok := typedFamilies[base]; ok {
+ return grp
+ }
}
- sort.Strings(labelParts)
+ break
+ }
+ }
+ return nil
+}
- labelStr := ""
- if len(labelParts) > 0 {
- labelStr = "{" + strings.Join(labelParts, ",")
+ "}"
+// typedFamilyBase returns the Prometheus family base name for a typed series.
+// For histogram/summary component series the trailing _bucket/_sum/_count is
stripped.
+// For all other types (counter, gauge, untyped) the name is used as-is.
+func typedFamilyBase(name, metricType string) string {
+ switch metricType {
+ case "histogram", "summary":
+ for _, suffix := range []string{"_bucket", "_sum", "_count"} {
+ if strings.HasSuffix(name, suffix) {
+ base := strings.TrimSuffix(name, suffix)
+ if base != "" {
+ return base
+ }
}
-
- builder.WriteString(fmt.Sprintf("%s%s %s\n",
group.name, labelStr, formatFloat(metric.Value)))
}
}
+ return name
+}
- return builder.String()
+// labelValueEscaper escapes the three characters that are special inside a
Prometheus
+// text-exposition label value: backslash, double-quote, and line feed. The
replacements are
+// applied in a single left-to-right pass, so an escaped backslash is not
re-escaped.
+var labelValueEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", `\n`)
+
+// formatMetricLine renders a single metric sample as a Prometheus text line.
+func formatMetricLine(m *metrics.AggregatedMetric) string {
+ labelParts := make([]string, 0, len(m.Labels))
+ for k, v := range m.Labels {
+ labelParts = append(labelParts, fmt.Sprintf(`%s="%s"`, k,
labelValueEscaper.Replace(v)))
+ }
+ sort.Strings(labelParts)
+ labelStr := ""
+ if len(labelParts) > 0 {
+ labelStr = "{" + strings.Join(labelParts, ",") + "}"
+ }
+ return fmt.Sprintf("%s%s %s\n", m.Name, labelStr, formatFloat(m.Value))
}
// formatMetricsWindowJSON formats aggregated metrics as JSON for
metrics-windows endpoint.
@@ -480,6 +580,7 @@ func getHistogramBaseName(metricName string) string {
type metricGroup struct {
name string
description string
+ metricType string
metrics []*metrics.AggregatedMetric
}
diff --git a/fodc/proxy/internal/api/server_test.go
b/fodc/proxy/internal/api/server_test.go
index b48e24bfd..56dda90f9 100644
--- a/fodc/proxy/internal/api/server_test.go
+++ b/fodc/proxy/internal/api/server_test.go
@@ -102,6 +102,11 @@ func newTestServerOnly(t *testing.T) *Server {
}
func createTestMetric(name string, value float64, labels map[string]string,
timestamp time.Time) *metrics.AggregatedMetric {
+ return createTestMetricWithType(name, value, labels, timestamp, "")
+}
+
+// createTestMetricWithType creates a test metric with an explicit Prometheus
type.
+func createTestMetricWithType(name string, value float64, labels
map[string]string, timestamp time.Time, metricType string)
*metrics.AggregatedMetric {
if labels == nil {
labels = make(map[string]string)
}
@@ -112,6 +117,7 @@ func createTestMetric(name string, value float64, labels
map[string]string, time
Timestamp: timestamp,
AgentID: "test-agent-1",
Description: "Test metric description",
+ Type: metricType,
}
}
@@ -970,6 +976,164 @@ func TestHandleClusterLifecycle_DataInfoEmitted(t
*testing.T) {
require.Len(t, dataInfo, 1)
}
+// TestFormatPrometheusText_CounterType verifies that a metric with
type=counter emits
+// "# TYPE foo_total counter" and is NOT mistaken for a histogram (invariant
2).
+func TestFormatPrometheusText_CounterType(t *testing.T) {
+ server := newTestServerOnly(t)
+ now := time.Now()
+ metricsList := []*metrics.AggregatedMetric{
+ createTestMetricWithType("http_requests_total", 42,
map[string]string{"method": "GET"}, now, "counter"),
+ }
+
+ result := server.formatPrometheusText(metricsList)
+
+ assert.Contains(t, result, "# TYPE http_requests_total counter")
+ assert.Contains(t, result, "http_requests_total{")
+ assert.NotContains(t, result, "# TYPE http_requests_total gauge")
+ assert.NotContains(t, result, "# TYPE http_requests_total histogram")
+}
+
+// TestFormatPrometheusText_CounterWithCountSuffix verifies that a counter
named foo_count
+// (not a histogram component) keeps type=counter and is NOT folded into
histogram base "foo"
+// (invariant 3 — the key fix for the mislabeled-histogram defect).
+func TestFormatPrometheusText_CounterWithCountSuffix(t *testing.T) {
+ server := newTestServerOnly(t)
+ now := time.Now()
+ metricsList := []*metrics.AggregatedMetric{
+ createTestMetricWithType("banyandb_grpc_request_count", 100,
map[string]string{"pod_name": "p1"}, now, "counter"),
+ }
+
+ result := server.formatPrometheusText(metricsList)
+
+ assert.Contains(t, result, "# TYPE banyandb_grpc_request_count counter")
+ // Must NOT be folded into a histogram base called
"banyandb_grpc_request"
+ assert.NotContains(t, result, "# TYPE banyandb_grpc_request histogram")
+ assert.Contains(t, result, "banyandb_grpc_request_count{")
+}
+
+// TestFormatPrometheusText_HistogramTyped verifies that histogram component
series with
+// type=histogram share exactly one "# TYPE foo histogram" line (invariant 4).
+func TestFormatPrometheusText_HistogramTyped(t *testing.T) {
+ server := newTestServerOnly(t)
+ now := time.Now()
+ labels := map[string]string{"pod_name": "p1"}
+ metricsList := []*metrics.AggregatedMetric{
+ createTestMetricWithType("request_latency_bucket", 10, labels,
now, "histogram"),
+ createTestMetricWithType("request_latency_sum", 1.5, labels,
now, "histogram"),
+ createTestMetricWithType("request_latency_count", 10, labels,
now, "histogram"),
+ }
+
+ result := server.formatPrometheusText(metricsList)
+
+ // Exactly one TYPE line for the base name.
+ assert.Equal(t, 1, strings.Count(result, "# TYPE request_latency
histogram"),
+ "must emit exactly one TYPE histogram line")
+ assert.Contains(t, result, "request_latency_bucket{")
+ assert.Contains(t, result, "request_latency_sum{")
+ assert.Contains(t, result, "request_latency_count{")
+ // Component names must not get their own TYPE lines.
+ assert.NotContains(t, result, "# TYPE request_latency_bucket")
+ assert.NotContains(t, result, "# TYPE request_latency_sum")
+ assert.NotContains(t, result, "# TYPE request_latency_count")
+}
+
+// TestFormatPrometheusText_SummaryTyped verifies that summary families emit
exactly one
+// "# TYPE foo summary" line and all series (quantile, _sum, _count) fall
under it (invariant 5).
+func TestFormatPrometheusText_SummaryTyped(t *testing.T) {
+ server := newTestServerOnly(t)
+ now := time.Now()
+ labelsQ := map[string]string{"quantile": "0.5"}
+ labelsBase := map[string]string{}
+ metricsList := []*metrics.AggregatedMetric{
+ createTestMetricWithType("go_gc_duration_seconds", 0.0001,
labelsQ, now, "summary"),
+ createTestMetricWithType("go_gc_duration_seconds_sum", 0.5,
labelsBase, now, "summary"),
+ createTestMetricWithType("go_gc_duration_seconds_count", 42,
labelsBase, now, "summary"),
+ }
+
+ result := server.formatPrometheusText(metricsList)
+
+ // Exactly one TYPE line for the base name.
+ assert.Equal(t, 1, strings.Count(result, "# TYPE go_gc_duration_seconds
summary"),
+ "must emit exactly one TYPE summary line")
+ assert.Contains(t, result, "go_gc_duration_seconds{")
+ assert.Contains(t, result, "go_gc_duration_seconds_sum")
+ assert.Contains(t, result, "go_gc_duration_seconds_count")
+ // Must NOT emit a separate gauge TYPE line for the bare name.
+ assert.NotContains(t, result, "# TYPE go_gc_duration_seconds gauge")
+}
+
+// TestFormatPrometheusText_MixedVersionDedup verifies that when the same
metric family
+// arrives both typed (upgraded agent) and untyped (pre-upgrade agent) in one
scrape, the
+// proxy emits exactly ONE "# TYPE" line — the typed one — and folds the
untyped samples
+// under it, rather than emitting a conflicting second TYPE line that
Prometheus rejects.
+func TestFormatPrometheusText_MixedVersionDedup(t *testing.T) {
+ server := newTestServerOnly(t)
+ now := time.Now()
+ metricsList := []*metrics.AggregatedMetric{
+ createTestMetricWithType("process_cpu_seconds_total", 5,
map[string]string{"pod_name": "new"}, now, "counter"),
+ createTestMetricWithType("process_cpu_seconds_total", 3,
map[string]string{"pod_name": "old"}, now, ""),
+ }
+
+ result := server.formatPrometheusText(metricsList)
+
+ assert.Equal(t, 1, strings.Count(result, "# TYPE
process_cpu_seconds_total"),
+ "must emit exactly one TYPE line for the family")
+ assert.Contains(t, result, "# TYPE process_cpu_seconds_total counter")
+ assert.NotContains(t, result, "# TYPE process_cpu_seconds_total gauge")
+ // Both the upgraded and pre-upgrade samples must still be present.
+ assert.Contains(t, result, `pod_name="new"`)
+ assert.Contains(t, result, `pod_name="old"`)
+}
+
+// TestFormatPrometheusText_MixedVersionHistogramDedup verifies the same dedup
for histogram
+// component series: untyped _bucket/_sum/_count samples from a pre-upgrade
agent fold under
+// the typed histogram family rather than spawning a second (conflicting) TYPE
line.
+func TestFormatPrometheusText_MixedVersionHistogramDedup(t *testing.T) {
+ server := newTestServerOnly(t)
+ now := time.Now()
+ newPod := map[string]string{"le": "0.1", "pod_name": "new"}
+ oldPod := map[string]string{"le": "0.1", "pod_name": "old"}
+ metricsList := []*metrics.AggregatedMetric{
+ createTestMetricWithType("req_latency_bucket", 10, newPod, now,
"histogram"),
+ createTestMetricWithType("req_latency_sum", 1.0,
map[string]string{"pod_name": "new"}, now, "histogram"),
+ createTestMetricWithType("req_latency_count", 10,
map[string]string{"pod_name": "new"}, now, "histogram"),
+ createTestMetricWithType("req_latency_bucket", 8, oldPod, now,
""),
+ createTestMetricWithType("req_latency_sum", 0.8,
map[string]string{"pod_name": "old"}, now, ""),
+ createTestMetricWithType("req_latency_count", 8,
map[string]string{"pod_name": "old"}, now, ""),
+ }
+
+ result := server.formatPrometheusText(metricsList)
+
+ assert.Equal(t, 1, strings.Count(result, "# TYPE req_latency
histogram"),
+ "must emit exactly one histogram TYPE line for the family")
+ // No component or gauge TYPE lines leaked from the legacy path.
+ assert.NotContains(t, result, "# TYPE req_latency_bucket")
+ assert.NotContains(t, result, "# TYPE req_latency_sum")
+ assert.NotContains(t, result, "# TYPE req_latency_count")
+ assert.NotContains(t, result, "# TYPE req_latency gauge")
+ // Pre-upgrade samples still present.
+ assert.Contains(t, result, `pod_name="old"`)
+}
+
+// TestFormatPrometheusText_EscapesLabelValues verifies that label values
containing the
+// Prometheus-special characters (backslash, double-quote, line feed) are
escaped, so a value
+// with special characters cannot produce malformed exposition output that
breaks the scrape.
+func TestFormatPrometheusText_EscapesLabelValues(t *testing.T) {
+ server := newTestServerOnly(t)
+ now := time.Now()
+ metricsList := []*metrics.AggregatedMetric{
+ createTestMetricWithType("http_requests_total", 1,
map[string]string{
+ "path": `a"b\c` + "\n" + "d",
+ }, now, "counter"),
+ }
+
+ result := server.formatPrometheusText(metricsList)
+
+ assert.Contains(t, result, `path="a\"b\\c\nd"`)
+ // The raw, unescaped sequence must not leak into the output.
+ assert.NotContains(t, result, `path="a"b\c`)
+}
+
func TestHandleClusterLifecycle_StatusReportsUseProtoJSON(t *testing.T) {
initTestLogger(t)
testLogger := logger.GetLogger("test", "api")
diff --git a/fodc/proxy/internal/metrics/aggregator.go
b/fodc/proxy/internal/metrics/aggregator.go
index 6224cb560..986352a62 100644
--- a/fodc/proxy/internal/metrics/aggregator.go
+++ b/fodc/proxy/internal/metrics/aggregator.go
@@ -44,6 +44,7 @@ type AggregatedMetric struct {
Name string
AgentID string
Description string
+ Type string // lowercase prometheus type:
"counter","gauge","histogram","summary","untyped",""
Value float64
}
@@ -88,6 +89,25 @@ func (ma *Aggregator) SetGRPCService(grpcService
RequestSender) {
ma.grpcService = grpcService
}
+// protoMetricTypeToString converts a proto MetricType enum to its lowercase
string representation.
+// UNSPECIFIED maps to "" (unknown, causes proxy to fall back to suffix
heuristic).
+func protoMetricTypeToString(mt fodcv1.MetricType) string {
+ switch mt {
+ case fodcv1.MetricType_METRIC_TYPE_COUNTER:
+ return "counter"
+ case fodcv1.MetricType_METRIC_TYPE_GAUGE:
+ return "gauge"
+ case fodcv1.MetricType_METRIC_TYPE_HISTOGRAM:
+ return "histogram"
+ case fodcv1.MetricType_METRIC_TYPE_SUMMARY:
+ return "summary"
+ case fodcv1.MetricType_METRIC_TYPE_UNTYPED:
+ return "untyped"
+ default:
+ return ""
+ }
+}
+
// ProcessMetricsFromAgent processes metrics received from an agent.
func (ma *Aggregator) ProcessMetricsFromAgent(ctx context.Context, agentID
string, agentInfo *registry.AgentInfo, req *fodcv1.StreamMetricsRequest) error {
aggregatedMetrics := make([]*AggregatedMetric, 0, len(req.Metrics))
@@ -119,6 +139,7 @@ func (ma *Aggregator) ProcessMetricsFromAgent(ctx
context.Context, agentID strin
Timestamp: timestamp,
AgentID: agentID,
Description: metric.Description,
+ Type: protoMetricTypeToString(metric.Type),
}
aggregatedMetrics = append(aggregatedMetrics, aggregatedMetric)
diff --git a/go.mod b/go.mod
index 93463d9f8..4aec11520 100644
--- a/go.mod
+++ b/go.mod
@@ -176,7 +176,7 @@ require (
github.com/pelletier/go-toml/v2 v2.3.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 //
indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 //
indirect
- github.com/prometheus/client_model v0.6.2 // indirect
+ github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.67.5
github.com/prometheus/procfs v0.20.1 // indirect
github.com/robfig/cron/v3 v3.0.1