This is an automated email from the ASF dual-hosted git repository. ButterBright pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 097941a1191276e0d0fabcb36bd165a674f217c0 Author: Gao Hongtao <[email protected]> AuthorDate: Thu Jun 4 13:55:56 2026 +0800 Thread Prometheus metric type end-to-end through FODC agent and proxy (#1153) --- CHANGES.md | 1 + api/proto/banyandb/fodc/v1/rpc.proto | 11 + docs/api-reference.md | 19 ++ fodc/agent/internal/exporter/exporter.go | 19 +- .../internal/exporter/prometheus_format_test.go | 3 +- fodc/agent/internal/flightrecorder/datasource.go | 20 ++ fodc/agent/internal/metrics/parse.go | 57 ++++++ fodc/agent/internal/metrics/parse_test.go | 45 +++++ fodc/agent/internal/proxy/client.go | 27 ++- fodc/agent/internal/proxy/client_test.go | 9 +- fodc/agent/testhelper/flightrecorder.go | 2 + fodc/proxy/internal/api/server.go | 221 +++++++++++++++------ fodc/proxy/internal/api/server_test.go | 164 +++++++++++++++ fodc/proxy/internal/metrics/aggregator.go | 21 ++ go.mod | 4 +- 15 files changed, 554 insertions(+), 69 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c0928ad31..bd27150f3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ Release Notes. ### Bug Fixes - Fix trace query identity-tag projection: when `trace_id`/`span_id` are explicitly projected, reconstruct them from span identity at response build time instead of requesting them as stored tags, and preserve tag order with null-filled per-span value alignment in the distributed trace result iterator. +- Fix FODC proxy corrupting Prometheus metric types. The agent dropped the `# TYPE` line while parsing banyandb `/metrics`, the `StreamMetrics` proto carried no type field, and the proxy guessed the type from a name-suffix heuristic — downgrading counters to gauge, mislabeling `_count`-suffixed counters as histograms, and splitting summaries into two conflicting `# TYPE` lines. Capture the type with the Prometheus `expfmt` parser, store it in the flight recorder, thread it through a new [...] ## 0.10.2 diff --git a/api/proto/banyandb/fodc/v1/rpc.proto b/api/proto/banyandb/fodc/v1/rpc.proto index 9315da8a6..cfc48227d 100644 --- a/api/proto/banyandb/fodc/v1/rpc.proto +++ b/api/proto/banyandb/fodc/v1/rpc.proto @@ -60,12 +60,23 @@ message StreamMetricsRequest { google.protobuf.Timestamp timestamp = 2; } +// MetricType represents the Prometheus metric type. +enum MetricType { + METRIC_TYPE_UNSPECIFIED = 0; + METRIC_TYPE_GAUGE = 1; + METRIC_TYPE_COUNTER = 2; + METRIC_TYPE_HISTOGRAM = 3; + METRIC_TYPE_SUMMARY = 4; + METRIC_TYPE_UNTYPED = 5; +} + message Metric { string name = 1; map<string, string> labels = 2; double value = 3; string description = 4; google.protobuf.Timestamp timestamp = 5; + MetricType type = 6; } message StreamMetricsResponse { diff --git a/docs/api-reference.md b/docs/api-reference.md index 2c12c584f..89b9bd6c1 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -324,6 +324,8 @@ - [StreamMetricsResponse](#banyandb-fodc-v1-StreamMetricsResponse) - [Topology](#banyandb-fodc-v1-Topology) + - [MetricType](#banyandb-fodc-v1-MetricType) + - [FODCService](#banyandb-fodc-v1-FODCService) - [GroupLifecycleService](#banyandb-fodc-v1-GroupLifecycleService) @@ -4850,6 +4852,7 @@ Phase represents the current phase of the deletion task. | value | [double](#double) | | | | description | [string](#string) | | | | timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | | +| type | [MetricType](#banyandb-fodc-v1-MetricType) | | | @@ -5036,6 +5039,22 @@ Phase represents the current phase of the deletion task. + +<a name="banyandb-fodc-v1-MetricType"></a> + +### MetricType +MetricType represents the Prometheus metric type. + +| Name | Number | Description | +| ---- | ------ | ----------- | +| METRIC_TYPE_UNSPECIFIED | 0 | | +| METRIC_TYPE_GAUGE | 1 | | +| METRIC_TYPE_COUNTER | 2 | | +| METRIC_TYPE_HISTOGRAM | 3 | | +| METRIC_TYPE_SUMMARY | 4 | | +| METRIC_TYPE_UNTYPED | 5 | | + + diff --git a/fodc/agent/internal/exporter/exporter.go b/fodc/agent/internal/exporter/exporter.go index 20569a19c..c50716a3e 100644 --- a/fodc/agent/internal/exporter/exporter.go +++ b/fodc/agent/internal/exporter/exporter.go @@ -87,6 +87,21 @@ func (dc *DatasourceCollector) Describe(ch chan<- *prometheus.Desc) { } } +// chooseValueType maps a lowercase prometheus type string to a prometheus.ValueType +// suitable for NewConstMetric. Histogram and summary cannot be represented as scalar +// const metrics, so they degrade to UntypedValue. +func chooseValueType(t string) prometheus.ValueType { + switch t { + case "counter": + return prometheus.CounterValue + case "histogram", "summary", "untyped": + return prometheus.UntypedValue + default: + // gauge or "" (unknown) → gauge + return prometheus.GaugeValue + } +} + // Collect implements prometheus.Collector interface. // It collects metrics from all Datasources and sends them to the channel. func (dc *DatasourceCollector) Collect(ch chan<- prometheus.Metric) { @@ -94,6 +109,7 @@ func (dc *DatasourceCollector) Collect(ch chan<- prometheus.Metric) { for _, ds := range datasources { metricsMap := ds.GetMetrics() descriptions := ds.GetDescriptions() + types := ds.GetTypes() timestamps := ds.GetTimestamps() for metricKeyStr, metricBuffer := range metricsMap { @@ -128,9 +144,10 @@ func (dc *DatasourceCollector) Collect(ch chan<- prometheus.Metric) { labelValues[idx] = labelValueMap[labelName] } + metricType := metrics.ResolveMetricType(types, parsedKey.name) metric, createErr := prometheus.NewConstMetric( desc, - prometheus.GaugeValue, + chooseValueType(metricType), currentValue, labelValues..., ) diff --git a/fodc/agent/internal/exporter/prometheus_format_test.go b/fodc/agent/internal/exporter/prometheus_format_test.go index ea854a571..25665d5d2 100644 --- a/fodc/agent/internal/exporter/prometheus_format_test.go +++ b/fodc/agent/internal/exporter/prometheus_format_test.go @@ -44,6 +44,7 @@ func TestPrometheusFormatConversion(t *testing.T) { Value: 100.0, Labels: []metrics.Label{{Name: "method", Value: "GET"}, {Name: "status", Value: "200"}}, Desc: "Total number of HTTP requests", + Type: "counter", }, { Name: "cpu_usage", @@ -126,7 +127,7 @@ func TestPrometheusFormatConversion(t *testing.T) { // Check TYPE lines if strings.HasPrefix(line, "# TYPE") { - if strings.Contains(line, "http_requests_total") && strings.Contains(line, "gauge") { + if strings.Contains(line, "http_requests_total") && strings.Contains(line, "counter") { hasTypeHTTPRequests = true } if strings.Contains(line, "cpu_usage") && strings.Contains(line, "gauge") { diff --git a/fodc/agent/internal/flightrecorder/datasource.go b/fodc/agent/internal/flightrecorder/datasource.go index e8afff56c..747b467b0 100644 --- a/fodc/agent/internal/flightrecorder/datasource.go +++ b/fodc/agent/internal/flightrecorder/datasource.go @@ -67,6 +67,7 @@ func UpdateTimestampRingBuffer(trb *TimestampRingBuffer, v int64) { type Datasource struct { metrics map[string]*MetricRingBuffer // Map from metric name+labels to RingBuffer storing metric values descriptions map[string]string // Map from metric name to HELP content descriptions + types map[string]string // Map from metric name to Prometheus type string timestamps *TimestampRingBuffer // RingBuffer storing timestamps for each polling cycle mu sync.RWMutex CapacitySize int64 // Memory limit in bytes @@ -78,6 +79,7 @@ func NewDatasource() *Datasource { metrics: make(map[string]*MetricRingBuffer), timestamps: NewTimestampRingBuffer(), descriptions: make(map[string]string), + types: make(map[string]string), CapacitySize: 0, } } @@ -103,6 +105,9 @@ func (ds *Datasource) Update(m *metrics.RawMetric) error { if m.Desc != "" { ds.descriptions[m.Name] = m.Desc } + if m.Type != "" { + ds.types[m.Name] = m.Type + } UpdateMetricRingBuffer(ds.metrics[metricKey], m.Value) return nil @@ -140,6 +145,9 @@ func (ds *Datasource) UpdateBatch(rawMetrics []metrics.RawMetric, timestamp int6 if m.Desc != "" { ds.descriptions[m.Name] = m.Desc } + if m.Type != "" { + ds.types[m.Name] = m.Type + } buffer := ds.metrics[metricKey] UpdateMetricRingBuffer(buffer, m.Value) updatedBuffers = append(updatedBuffers, buffer) @@ -259,3 +267,15 @@ func (ds *Datasource) GetDescriptions() map[string]string { } return result } + +// GetTypes returns a copy of the types map. +func (ds *Datasource) GetTypes() map[string]string { + ds.mu.RLock() + defer ds.mu.RUnlock() + + result := make(map[string]string) + for k, v := range ds.types { + result[k] = v + } + return result +} diff --git a/fodc/agent/internal/metrics/parse.go b/fodc/agent/internal/metrics/parse.go index 5a4375142..974e960d5 100644 --- a/fodc/agent/internal/metrics/parse.go +++ b/fodc/agent/internal/metrics/parse.go @@ -24,6 +24,10 @@ import ( "sort" "strconv" "strings" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" ) // Label represents a metric label as a key-value pair. @@ -36,6 +40,7 @@ type Label struct { type RawMetric struct { Name string Desc string + Type string // lowercase prometheus type: "counter","gauge","histogram","summary","untyped","" Labels []Label Value float64 } @@ -76,8 +81,59 @@ var ( labelRegex = regexp.MustCompile(`(\w+)="([^"]+)"`) ) +// metricTypeToString converts a dto.MetricType to its lowercase Prometheus text representation. +func metricTypeToString(mt dto.MetricType) string { + switch mt { + case dto.MetricType_COUNTER: + return "counter" + case dto.MetricType_GAUGE: + return "gauge" + case dto.MetricType_SUMMARY: + return "summary" + case dto.MetricType_HISTOGRAM: + return "histogram" + case dto.MetricType_UNTYPED: + return "untyped" + default: + return "" + } +} + +// ResolveMetricType resolves the metric type for a given series name using the typeMap. +// For histogram/summary component series (ending in _bucket/_sum/_count), the base family +// name is used to look up the type. Returns "" if unknown. +func ResolveMetricType(typeMap map[string]string, name string) string { + if t, ok := typeMap[name]; ok { + return t + } + // Trim histogram/summary suffixes to find the base family name. + for _, suffix := range []string{"_bucket", "_sum", "_count"} { + if strings.HasSuffix(name, suffix) { + base := strings.TrimSuffix(name, suffix) + if base != "" { + if t, ok := typeMap[base]; ok { + return t + } + } + break + } + } + return "" +} + // ParseWithAgentLabels parses Prometheus text format metrics and returns structured RawMetric objects. func ParseWithAgentLabels(text string, nodeRole, podName, containerName string) ([]RawMetric, error) { + // Build authoritative type map using expfmt. + // TextToMetricFamilies may return a non-nil error AND a non-empty families map + // simultaneously (e.g. for "unexpected end of input stream" on trailing newline). + // We always use whatever families were successfully parsed, ignoring parse errors. + typeMap := make(map[string]string) + parser := expfmt.NewTextParser(model.LegacyValidation) + families, _ := parser.TextToMetricFamilies(strings.NewReader(text)) + for familyName, mf := range families { + typeMap[familyName] = metricTypeToString(mf.GetType()) + } + lines := strings.Split(text, "\n") var metrics []RawMetric helpMap := make(map[string]string) @@ -145,6 +201,7 @@ func ParseWithAgentLabels(text string, nodeRole, podName, containerName string) Labels: labels, Value: value, Desc: desc, + Type: ResolveMetricType(typeMap, metricName), }) } diff --git a/fodc/agent/internal/metrics/parse_test.go b/fodc/agent/internal/metrics/parse_test.go index fab8c5fdf..efb4adff4 100644 --- a/fodc/agent/internal/metrics/parse_test.go +++ b/fodc/agent/internal/metrics/parse_test.go @@ -1076,6 +1076,51 @@ func TestParse_LabelValueWithNumbers(t *testing.T) { assert.Equal(t, "value123", metrics[0].Labels[0].Value) } +// TestParse_TypePropagation verifies that expfmt-derived types are correctly set on +// RawMetric.Type, including base-name resolution for histogram component series. +func TestParse_TypePropagation(t *testing.T) { + text := `# HELP http_requests_total Total HTTP requests +# TYPE http_requests_total counter +http_requests_total{method="GET"} 1234 +# HELP request_latency Request latency histogram +# TYPE request_latency histogram +request_latency_bucket{le="0.1"} 10 +request_latency_bucket{le="0.5"} 25 +request_latency_bucket{le="+Inf"} 30 +request_latency_sum 12.5 +request_latency_count 30 +# HELP cpu_usage CPU usage +# TYPE cpu_usage gauge +cpu_usage 75.5` + + result, parseErr := ParseWithAgentLabels(text, "", "", "") + + require.NoError(t, parseErr) + + // Build a name→type map for easy assertion. + typeByName := make(map[string][]string) + for _, m := range result { + typeByName[m.Name] = append(typeByName[m.Name], m.Type) + } + + // Counter: all series for http_requests_total must be "counter". + for _, typ := range typeByName["http_requests_total"] { + assert.Equal(t, "counter", typ, "http_requests_total should be counter") + } + + // Gauge. + for _, typ := range typeByName["cpu_usage"] { + assert.Equal(t, "gauge", typ, "cpu_usage should be gauge") + } + + // Histogram component series must all resolve to "histogram". + for _, name := range []string{"request_latency_bucket", "request_latency_sum", "request_latency_count"} { + for _, typ := range typeByName[name] { + assert.Equal(t, "histogram", typ, "%s should resolve to histogram via base name", name) + } + } +} + func TestParse_WithAgentIdentityLabels(t *testing.T) { text := `cpu_usage{instance="localhost"} 75.5 http_requests_total{method="GET"} 100` diff --git a/fodc/agent/internal/proxy/client.go b/fodc/agent/internal/proxy/client.go index 26752365d..a6ff1eae8 100644 --- a/fodc/agent/internal/proxy/client.go +++ b/fodc/agent/internal/proxy/client.go @@ -518,6 +518,7 @@ func (c *Client) RetrieveAndSendMetrics(_ context.Context, filter *MetricsReques allMetrics := ds.GetMetrics() timestamps := ds.GetTimestamps() descriptions := ds.GetDescriptions() + types := ds.GetTypes() if filter != nil && (filter.StartTime != nil || filter.EndTime != nil) { if timestamps == nil { @@ -562,7 +563,7 @@ func (c *Client) RetrieveAndSendMetrics(_ context.Context, filter *MetricsReques c.streamsMu.Unlock() return fmt.Errorf("metrics stream changed during send") } - sendErr := c.sendFilteredMetrics(metricsStream, allMetrics, timestampValues, descriptions, filter) + sendErr := c.sendFilteredMetrics(metricsStream, allMetrics, timestampValues, descriptions, types, filter) c.streamsMu.Unlock() return sendErr } @@ -572,16 +573,35 @@ func (c *Client) RetrieveAndSendMetrics(_ context.Context, filter *MetricsReques c.streamsMu.Unlock() return fmt.Errorf("metrics stream changed during send") } - sendErr := c.sendLatestMetrics(metricsStream, allMetrics, descriptions) + sendErr := c.sendLatestMetrics(metricsStream, allMetrics, descriptions, types) c.streamsMu.Unlock() return sendErr } +// toProtoMetricType converts a lowercase prometheus type string to the proto enum. +func toProtoMetricType(t string) fodcv1.MetricType { + switch t { + case "counter": + return fodcv1.MetricType_METRIC_TYPE_COUNTER + case "gauge": + return fodcv1.MetricType_METRIC_TYPE_GAUGE + case "histogram": + return fodcv1.MetricType_METRIC_TYPE_HISTOGRAM + case "summary": + return fodcv1.MetricType_METRIC_TYPE_SUMMARY + case "untyped": + return fodcv1.MetricType_METRIC_TYPE_UNTYPED + default: + return fodcv1.MetricType_METRIC_TYPE_UNSPECIFIED + } +} + // sendLatestMetrics sends the latest metrics (most recent values). func (c *Client) sendLatestMetrics( stream fodcv1.FODCService_StreamMetricsClient, allMetrics map[string]*flightrecorder.MetricRingBuffer, descriptions map[string]string, + types map[string]string, ) error { protoMetrics := make([]*fodcv1.Metric, 0) @@ -609,6 +629,7 @@ func (c *Client) sendLatestMetrics( Labels: labelsMap, Value: metricValue, Description: descriptions[parsedKey.Name], + Type: toProtoMetricType(metrics.ResolveMetricType(types, parsedKey.Name)), } protoMetrics = append(protoMetrics, protoMetric) @@ -632,6 +653,7 @@ func (c *Client) sendFilteredMetrics( allMetrics map[string]*flightrecorder.MetricRingBuffer, timestampValues []int64, descriptions map[string]string, + types map[string]string, filter *MetricsRequestFilter, ) error { protoMetrics := make([]*fodcv1.Metric, 0) @@ -677,6 +699,7 @@ func (c *Client) sendFilteredMetrics( Value: metricValues[idx], Description: description, Timestamp: timestamppb.New(timestamp), + Type: toProtoMetricType(metrics.ResolveMetricType(types, parsedKey.Name)), } protoMetrics = append(protoMetrics, protoMetric) diff --git a/fodc/agent/internal/proxy/client_test.go b/fodc/agent/internal/proxy/client_test.go index 0c5346c3d..d1576d3de 100644 --- a/fodc/agent/internal/proxy/client_test.go +++ b/fodc/agent/internal/proxy/client_test.go @@ -1328,11 +1328,12 @@ func TestProxyClient_sendLatestMetrics_WithUnfinalizedValue(t *testing.T) { allMetrics := ds.GetMetrics() descriptions := ds.GetDescriptions() + types := ds.GetTypes() ctx := context.Background() mockStream := newMockStreamMetricsClient(ctx) - err := pc.sendLatestMetrics(mockStream, allMetrics, descriptions) + err := pc.sendLatestMetrics(mockStream, allMetrics, descriptions, types) require.NoError(t, err) mockStream.mu.Lock() @@ -1371,6 +1372,7 @@ func TestProxyClient_sendFilteredMetrics_TimeWindow(t *testing.T) { timestamps := ds.GetTimestamps() timestampValues := timestamps.GetAllValues() descriptions := ds.GetDescriptions() + types := ds.GetTypes() startTime := now.Add(-90 * time.Minute) endTime := now.Add(-30 * time.Minute) @@ -1382,7 +1384,7 @@ func TestProxyClient_sendFilteredMetrics_TimeWindow(t *testing.T) { ctx := context.Background() mockStream := newMockStreamMetricsClient(ctx) - err := pc.sendFilteredMetrics(mockStream, allMetrics, timestampValues, descriptions, filter) + err := pc.sendFilteredMetrics(mockStream, allMetrics, timestampValues, descriptions, types, filter) require.NoError(t, err) mockStream.mu.Lock() @@ -1414,6 +1416,7 @@ func TestProxyClient_sendFilteredMetrics_NoMatchingTimeWindow(t *testing.T) { timestamps := ds.GetTimestamps() timestampValues := timestamps.GetAllValues() descriptions := ds.GetDescriptions() + types := ds.GetTypes() // Filter for future time window (no matches) startTime := now.Add(1 * time.Hour) @@ -1426,7 +1429,7 @@ func TestProxyClient_sendFilteredMetrics_NoMatchingTimeWindow(t *testing.T) { ctx := context.Background() mockStream := newMockStreamMetricsClient(ctx) - err := pc.sendFilteredMetrics(mockStream, allMetrics, timestampValues, descriptions, filter) + err := pc.sendFilteredMetrics(mockStream, allMetrics, timestampValues, descriptions, types, filter) require.NoError(t, err) mockStream.mu.Lock() diff --git a/fodc/agent/testhelper/flightrecorder.go b/fodc/agent/testhelper/flightrecorder.go index 4b3390cf2..76db6c0af 100644 --- a/fodc/agent/testhelper/flightrecorder.go +++ b/fodc/agent/testhelper/flightrecorder.go @@ -42,6 +42,7 @@ func NewFlightRecorder(capacitySize int64) *flightrecorder.FlightRecorder { type RawMetric struct { Desc string Name string + Type string Labels []Label Value float64 } @@ -73,6 +74,7 @@ func UpdateMetrics(fr interface{}, rawMetrics []RawMetric) error { Name: rawMetrics[idx].Name, Value: rawMetrics[idx].Value, Desc: rawMetrics[idx].Desc, + Type: rawMetrics[idx].Type, Labels: internalLabels, } } diff --git a/fodc/proxy/internal/api/server.go b/fodc/proxy/internal/api/server.go index f8c9338fa..3673beafd 100644 --- a/fodc/proxy/internal/api/server.go +++ b/fodc/proxy/internal/api/server.go @@ -265,82 +265,137 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { } // formatPrometheusText formats aggregated metrics as Prometheus text format. +// +// Metrics with a known Type (non-empty) are emitted using the real type. Metrics with +// Type=="" (pre-upgrade agents) fall back to the legacy suffix-heuristic path so behavior +// is unchanged for those metrics. func (s *Server) formatPrometheusText(aggregatedMetrics []*metrics.AggregatedMetric) string { if len(aggregatedMetrics) == 0 { return "" } - metricMap := make(map[string]*metricGroup) - histogramBases := make(map[string]bool) + // Partition into typed (type known from agent) and untyped (legacy/unknown). + var typed []*metrics.AggregatedMetric + var untyped []*metrics.AggregatedMetric + for _, m := range aggregatedMetrics { + if m.Type != "" { + typed = append(typed, m) + } else { + untyped = append(untyped, m) + } + } - for _, metric := range aggregatedMetrics { - key := metric.Name - group, exists := metricMap[key] + var builder strings.Builder + remainingUntyped := writeTypedFamilies(&builder, typed, untyped) + writeUntypedFamilies(&builder, remainingUntyped) + return builder.String() +} + +// writeTypedFamilies groups typed metrics by family base and emits one "# TYPE" line per +// family using the real type. The family base for histogram/summary component series +// (_bucket/_sum/_count) is the name with the suffix stripped; bare summary quantile series +// and all counter/gauge/untyped series use the name as-is. Untyped metrics whose base +// collides with a typed family are absorbed under the authoritative typed line — this +// prevents a mixed-version rollout from emitting two conflicting "# TYPE" lines for one +// name. The untyped metrics with no typed-family collision are returned for the legacy path. +func writeTypedFamilies(builder *strings.Builder, typed, untyped []*metrics.AggregatedMetric) []*metrics.AggregatedMetric { + typedFamilyOrder := make([]string, 0) + typedFamilies := make(map[string]*metricGroup) // base → group + for _, m := range typed { + base := typedFamilyBase(m.Name, m.Type) + grp, exists := typedFamilies[base] if !exists { - group = &metricGroup{ - name: metric.Name, - description: metric.Description, + grp = &metricGroup{ + name: base, + description: m.Description, + metricType: m.Type, metrics: make([]*metrics.AggregatedMetric, 0), } - metricMap[key] = group + typedFamilies[base] = grp + typedFamilyOrder = append(typedFamilyOrder, base) } - group.metrics = append(group.metrics, metric) - - if strings.HasSuffix(metric.Name, "_bucket") || - strings.HasSuffix(metric.Name, "_sum") || - strings.HasSuffix(metric.Name, "_count") { - baseName := getHistogramBaseName(metric.Name) - if baseName != "" { - histogramBases[baseName] = true + grp.metrics = append(grp.metrics, m) + } + + remainingUntyped := make([]*metrics.AggregatedMetric, 0, len(untyped)) + for _, m := range untyped { + grp := matchTypedFamily(typedFamilies, m.Name) + if grp == nil { + remainingUntyped = append(remainingUntyped, m) + continue + } + if grp.description == "" && m.Description != "" { + grp.description = m.Description + } + grp.metrics = append(grp.metrics, m) + } + + sort.Strings(typedFamilyOrder) + for _, base := range typedFamilyOrder { + grp := typedFamilies[base] + if grp.description != "" { + builder.WriteString(fmt.Sprintf("# HELP %s %s\n", base, grp.description)) + } + builder.WriteString(fmt.Sprintf("# TYPE %s %s\n", base, grp.metricType)) + for _, m := range grp.metrics { + builder.WriteString(formatMetricLine(m)) + } + } + return remainingUntyped +} + +// writeUntypedFamilies emits metrics with no known type using the legacy suffix heuristic: +// a base name with a _bucket/_sum/_count sibling is treated as a histogram, everything else +// as a gauge. This preserves pre-typed behavior for pre-upgrade agents. +func writeUntypedFamilies(builder *strings.Builder, untyped []*metrics.AggregatedMetric) { + if len(untyped) == 0 { + return + } + metricMap := make(map[string]*metricGroup) + histogramBases := make(map[string]bool) + for _, m := range untyped { + grp, exists := metricMap[m.Name] + if !exists { + grp = &metricGroup{ + name: m.Name, + description: m.Description, + metrics: make([]*metrics.AggregatedMetric, 0), } + metricMap[m.Name] = grp + } + grp.metrics = append(grp.metrics, m) + if baseName := getHistogramBaseName(m.Name); baseName != "" { + histogramBases[baseName] = true } } - histogramMetrics := make(map[string][]*metrics.AggregatedMetric) + histogramMetricsMap := make(map[string][]*metrics.AggregatedMetric) regularMetrics := make(map[string]*metricGroup) - - for name, group := range metricMap { + for name, grp := range metricMap { baseName := getHistogramBaseName(name) if baseName != "" && histogramBases[baseName] { - histogramMetrics[baseName] = append(histogramMetrics[baseName], group.metrics...) + histogramMetricsMap[baseName] = append(histogramMetricsMap[baseName], grp.metrics...) } else { - regularMetrics[name] = group + regularMetrics[name] = grp } } - var builder strings.Builder - histogramNames := make([]string, 0, len(histogramBases)) for baseName := range histogramBases { histogramNames = append(histogramNames, baseName) } sort.Strings(histogramNames) - for _, baseName := range histogramNames { - allMetrics := histogramMetrics[baseName] - if len(allMetrics) == 0 { + allM := histogramMetricsMap[baseName] + if len(allM) == 0 { continue } - - description := allMetrics[0].Description - if description != "" { + if description := allM[0].Description; description != "" { builder.WriteString(fmt.Sprintf("# HELP %s %s\n", baseName, description)) } builder.WriteString(fmt.Sprintf("# TYPE %s histogram\n", baseName)) - - for _, metric := range allMetrics { - labelParts := make([]string, 0, len(metric.Labels)) - for key, value := range metric.Labels { - labelParts = append(labelParts, fmt.Sprintf(`%s="%s"`, key, value)) - } - sort.Strings(labelParts) - - labelStr := "" - if len(labelParts) > 0 { - labelStr = "{" + strings.Join(labelParts, ",") + "}" - } - - builder.WriteString(fmt.Sprintf("%s%s %s\n", metric.Name, labelStr, formatFloat(metric.Value))) + for _, m := range allM { + builder.WriteString(formatMetricLine(m)) } } @@ -349,31 +404,76 @@ func (s *Server) formatPrometheusText(aggregatedMetrics []*metrics.AggregatedMet regularNames = append(regularNames, name) } sort.Strings(regularNames) - for _, name := range regularNames { - group := regularMetrics[name] - if group.description != "" { - builder.WriteString(fmt.Sprintf("# HELP %s %s\n", group.name, group.description)) + grp := regularMetrics[name] + if grp.description != "" { + builder.WriteString(fmt.Sprintf("# HELP %s %s\n", grp.name, grp.description)) + } + builder.WriteString(fmt.Sprintf("# TYPE %s gauge\n", grp.name)) + for _, m := range grp.metrics { + builder.WriteString(formatMetricLine(m)) } - builder.WriteString(fmt.Sprintf("# TYPE %s gauge\n", group.name)) + } +} - for _, metric := range group.metrics { - labelParts := make([]string, 0, len(metric.Labels)) - for key, value := range metric.Labels { - labelParts = append(labelParts, fmt.Sprintf(`%s="%s"`, key, value)) +// matchTypedFamily returns the typed family an untyped series belongs to, or nil if none. +// It checks the exact name first (counter/gauge/untyped series and bare summary quantile +// series), then the histogram/summary component base after trimming a trailing +// _bucket/_sum/_count suffix. Used to fold pre-upgrade (untyped) samples into the +// authoritative typed family so the proxy never emits two TYPE lines for one name. +func matchTypedFamily(typedFamilies map[string]*metricGroup, name string) *metricGroup { + if grp, ok := typedFamilies[name]; ok { + return grp + } + for _, suffix := range []string{"_bucket", "_sum", "_count"} { + if strings.HasSuffix(name, suffix) { + base := strings.TrimSuffix(name, suffix) + if base != "" { + if grp, ok := typedFamilies[base]; ok { + return grp + } } - sort.Strings(labelParts) + break + } + } + return nil +} - labelStr := "" - if len(labelParts) > 0 { - labelStr = "{" + strings.Join(labelParts, ",") + "}" +// typedFamilyBase returns the Prometheus family base name for a typed series. +// For histogram/summary component series the trailing _bucket/_sum/_count is stripped. +// For all other types (counter, gauge, untyped) the name is used as-is. +func typedFamilyBase(name, metricType string) string { + switch metricType { + case "histogram", "summary": + for _, suffix := range []string{"_bucket", "_sum", "_count"} { + if strings.HasSuffix(name, suffix) { + base := strings.TrimSuffix(name, suffix) + if base != "" { + return base + } } - - builder.WriteString(fmt.Sprintf("%s%s %s\n", group.name, labelStr, formatFloat(metric.Value))) } } + return name +} - return builder.String() +// labelValueEscaper escapes the three characters that are special inside a Prometheus +// text-exposition label value: backslash, double-quote, and line feed. The replacements are +// applied in a single left-to-right pass, so an escaped backslash is not re-escaped. +var labelValueEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", `\n`) + +// formatMetricLine renders a single metric sample as a Prometheus text line. +func formatMetricLine(m *metrics.AggregatedMetric) string { + labelParts := make([]string, 0, len(m.Labels)) + for k, v := range m.Labels { + labelParts = append(labelParts, fmt.Sprintf(`%s="%s"`, k, labelValueEscaper.Replace(v))) + } + sort.Strings(labelParts) + labelStr := "" + if len(labelParts) > 0 { + labelStr = "{" + strings.Join(labelParts, ",") + "}" + } + return fmt.Sprintf("%s%s %s\n", m.Name, labelStr, formatFloat(m.Value)) } // formatMetricsWindowJSON formats aggregated metrics as JSON for metrics-windows endpoint. @@ -475,6 +575,7 @@ func getHistogramBaseName(metricName string) string { type metricGroup struct { name string description string + metricType string metrics []*metrics.AggregatedMetric } diff --git a/fodc/proxy/internal/api/server_test.go b/fodc/proxy/internal/api/server_test.go index a4cf4ba1f..572dc2369 100644 --- a/fodc/proxy/internal/api/server_test.go +++ b/fodc/proxy/internal/api/server_test.go @@ -102,6 +102,11 @@ func newTestServerOnly(t *testing.T) *Server { } func createTestMetric(name string, value float64, labels map[string]string, timestamp time.Time) *metrics.AggregatedMetric { + return createTestMetricWithType(name, value, labels, timestamp, "") +} + +// createTestMetricWithType creates a test metric with an explicit Prometheus type. +func createTestMetricWithType(name string, value float64, labels map[string]string, timestamp time.Time, metricType string) *metrics.AggregatedMetric { if labels == nil { labels = make(map[string]string) } @@ -112,6 +117,7 @@ func createTestMetric(name string, value float64, labels map[string]string, time Timestamp: timestamp, AgentID: "test-agent-1", Description: "Test metric description", + Type: metricType, } } @@ -970,6 +976,164 @@ func TestHandleClusterLifecycle_DataInfoEmitted(t *testing.T) { require.Len(t, dataInfo, 1) } +// TestFormatPrometheusText_CounterType verifies that a metric with type=counter emits +// "# TYPE foo_total counter" and is NOT mistaken for a histogram (invariant 2). +func TestFormatPrometheusText_CounterType(t *testing.T) { + server := newTestServerOnly(t) + now := time.Now() + metricsList := []*metrics.AggregatedMetric{ + createTestMetricWithType("http_requests_total", 42, map[string]string{"method": "GET"}, now, "counter"), + } + + result := server.formatPrometheusText(metricsList) + + assert.Contains(t, result, "# TYPE http_requests_total counter") + assert.Contains(t, result, "http_requests_total{") + assert.NotContains(t, result, "# TYPE http_requests_total gauge") + assert.NotContains(t, result, "# TYPE http_requests_total histogram") +} + +// TestFormatPrometheusText_CounterWithCountSuffix verifies that a counter named foo_count +// (not a histogram component) keeps type=counter and is NOT folded into histogram base "foo" +// (invariant 3 — the key fix for the mislabeled-histogram defect). +func TestFormatPrometheusText_CounterWithCountSuffix(t *testing.T) { + server := newTestServerOnly(t) + now := time.Now() + metricsList := []*metrics.AggregatedMetric{ + createTestMetricWithType("banyandb_grpc_request_count", 100, map[string]string{"pod_name": "p1"}, now, "counter"), + } + + result := server.formatPrometheusText(metricsList) + + assert.Contains(t, result, "# TYPE banyandb_grpc_request_count counter") + // Must NOT be folded into a histogram base called "banyandb_grpc_request" + assert.NotContains(t, result, "# TYPE banyandb_grpc_request histogram") + assert.Contains(t, result, "banyandb_grpc_request_count{") +} + +// TestFormatPrometheusText_HistogramTyped verifies that histogram component series with +// type=histogram share exactly one "# TYPE foo histogram" line (invariant 4). +func TestFormatPrometheusText_HistogramTyped(t *testing.T) { + server := newTestServerOnly(t) + now := time.Now() + labels := map[string]string{"pod_name": "p1"} + metricsList := []*metrics.AggregatedMetric{ + createTestMetricWithType("request_latency_bucket", 10, labels, now, "histogram"), + createTestMetricWithType("request_latency_sum", 1.5, labels, now, "histogram"), + createTestMetricWithType("request_latency_count", 10, labels, now, "histogram"), + } + + result := server.formatPrometheusText(metricsList) + + // Exactly one TYPE line for the base name. + assert.Equal(t, 1, strings.Count(result, "# TYPE request_latency histogram"), + "must emit exactly one TYPE histogram line") + assert.Contains(t, result, "request_latency_bucket{") + assert.Contains(t, result, "request_latency_sum{") + assert.Contains(t, result, "request_latency_count{") + // Component names must not get their own TYPE lines. + assert.NotContains(t, result, "# TYPE request_latency_bucket") + assert.NotContains(t, result, "# TYPE request_latency_sum") + assert.NotContains(t, result, "# TYPE request_latency_count") +} + +// TestFormatPrometheusText_SummaryTyped verifies that summary families emit exactly one +// "# TYPE foo summary" line and all series (quantile, _sum, _count) fall under it (invariant 5). +func TestFormatPrometheusText_SummaryTyped(t *testing.T) { + server := newTestServerOnly(t) + now := time.Now() + labelsQ := map[string]string{"quantile": "0.5"} + labelsBase := map[string]string{} + metricsList := []*metrics.AggregatedMetric{ + createTestMetricWithType("go_gc_duration_seconds", 0.0001, labelsQ, now, "summary"), + createTestMetricWithType("go_gc_duration_seconds_sum", 0.5, labelsBase, now, "summary"), + createTestMetricWithType("go_gc_duration_seconds_count", 42, labelsBase, now, "summary"), + } + + result := server.formatPrometheusText(metricsList) + + // Exactly one TYPE line for the base name. + assert.Equal(t, 1, strings.Count(result, "# TYPE go_gc_duration_seconds summary"), + "must emit exactly one TYPE summary line") + assert.Contains(t, result, "go_gc_duration_seconds{") + assert.Contains(t, result, "go_gc_duration_seconds_sum") + assert.Contains(t, result, "go_gc_duration_seconds_count") + // Must NOT emit a separate gauge TYPE line for the bare name. + assert.NotContains(t, result, "# TYPE go_gc_duration_seconds gauge") +} + +// TestFormatPrometheusText_MixedVersionDedup verifies that when the same metric family +// arrives both typed (upgraded agent) and untyped (pre-upgrade agent) in one scrape, the +// proxy emits exactly ONE "# TYPE" line — the typed one — and folds the untyped samples +// under it, rather than emitting a conflicting second TYPE line that Prometheus rejects. +func TestFormatPrometheusText_MixedVersionDedup(t *testing.T) { + server := newTestServerOnly(t) + now := time.Now() + metricsList := []*metrics.AggregatedMetric{ + createTestMetricWithType("process_cpu_seconds_total", 5, map[string]string{"pod_name": "new"}, now, "counter"), + createTestMetricWithType("process_cpu_seconds_total", 3, map[string]string{"pod_name": "old"}, now, ""), + } + + result := server.formatPrometheusText(metricsList) + + assert.Equal(t, 1, strings.Count(result, "# TYPE process_cpu_seconds_total"), + "must emit exactly one TYPE line for the family") + assert.Contains(t, result, "# TYPE process_cpu_seconds_total counter") + assert.NotContains(t, result, "# TYPE process_cpu_seconds_total gauge") + // Both the upgraded and pre-upgrade samples must still be present. + assert.Contains(t, result, `pod_name="new"`) + assert.Contains(t, result, `pod_name="old"`) +} + +// TestFormatPrometheusText_MixedVersionHistogramDedup verifies the same dedup for histogram +// component series: untyped _bucket/_sum/_count samples from a pre-upgrade agent fold under +// the typed histogram family rather than spawning a second (conflicting) TYPE line. +func TestFormatPrometheusText_MixedVersionHistogramDedup(t *testing.T) { + server := newTestServerOnly(t) + now := time.Now() + newPod := map[string]string{"le": "0.1", "pod_name": "new"} + oldPod := map[string]string{"le": "0.1", "pod_name": "old"} + metricsList := []*metrics.AggregatedMetric{ + createTestMetricWithType("req_latency_bucket", 10, newPod, now, "histogram"), + createTestMetricWithType("req_latency_sum", 1.0, map[string]string{"pod_name": "new"}, now, "histogram"), + createTestMetricWithType("req_latency_count", 10, map[string]string{"pod_name": "new"}, now, "histogram"), + createTestMetricWithType("req_latency_bucket", 8, oldPod, now, ""), + createTestMetricWithType("req_latency_sum", 0.8, map[string]string{"pod_name": "old"}, now, ""), + createTestMetricWithType("req_latency_count", 8, map[string]string{"pod_name": "old"}, now, ""), + } + + result := server.formatPrometheusText(metricsList) + + assert.Equal(t, 1, strings.Count(result, "# TYPE req_latency histogram"), + "must emit exactly one histogram TYPE line for the family") + // No component or gauge TYPE lines leaked from the legacy path. + assert.NotContains(t, result, "# TYPE req_latency_bucket") + assert.NotContains(t, result, "# TYPE req_latency_sum") + assert.NotContains(t, result, "# TYPE req_latency_count") + assert.NotContains(t, result, "# TYPE req_latency gauge") + // Pre-upgrade samples still present. + assert.Contains(t, result, `pod_name="old"`) +} + +// TestFormatPrometheusText_EscapesLabelValues verifies that label values containing the +// Prometheus-special characters (backslash, double-quote, line feed) are escaped, so a value +// with special characters cannot produce malformed exposition output that breaks the scrape. +func TestFormatPrometheusText_EscapesLabelValues(t *testing.T) { + server := newTestServerOnly(t) + now := time.Now() + metricsList := []*metrics.AggregatedMetric{ + createTestMetricWithType("http_requests_total", 1, map[string]string{ + "path": `a"b\c` + "\n" + "d", + }, now, "counter"), + } + + result := server.formatPrometheusText(metricsList) + + assert.Contains(t, result, `path="a\"b\\c\nd"`) + // The raw, unescaped sequence must not leak into the output. + assert.NotContains(t, result, `path="a"b\c`) +} + func TestHandleClusterLifecycle_StatusReportsUseProtoJSON(t *testing.T) { initTestLogger(t) testLogger := logger.GetLogger("test", "api") diff --git a/fodc/proxy/internal/metrics/aggregator.go b/fodc/proxy/internal/metrics/aggregator.go index 6224cb560..986352a62 100644 --- a/fodc/proxy/internal/metrics/aggregator.go +++ b/fodc/proxy/internal/metrics/aggregator.go @@ -44,6 +44,7 @@ type AggregatedMetric struct { Name string AgentID string Description string + Type string // lowercase prometheus type: "counter","gauge","histogram","summary","untyped","" Value float64 } @@ -88,6 +89,25 @@ func (ma *Aggregator) SetGRPCService(grpcService RequestSender) { ma.grpcService = grpcService } +// protoMetricTypeToString converts a proto MetricType enum to its lowercase string representation. +// UNSPECIFIED maps to "" (unknown, causes proxy to fall back to suffix heuristic). +func protoMetricTypeToString(mt fodcv1.MetricType) string { + switch mt { + case fodcv1.MetricType_METRIC_TYPE_COUNTER: + return "counter" + case fodcv1.MetricType_METRIC_TYPE_GAUGE: + return "gauge" + case fodcv1.MetricType_METRIC_TYPE_HISTOGRAM: + return "histogram" + case fodcv1.MetricType_METRIC_TYPE_SUMMARY: + return "summary" + case fodcv1.MetricType_METRIC_TYPE_UNTYPED: + return "untyped" + default: + return "" + } +} + // ProcessMetricsFromAgent processes metrics received from an agent. func (ma *Aggregator) ProcessMetricsFromAgent(ctx context.Context, agentID string, agentInfo *registry.AgentInfo, req *fodcv1.StreamMetricsRequest) error { aggregatedMetrics := make([]*AggregatedMetric, 0, len(req.Metrics)) @@ -119,6 +139,7 @@ func (ma *Aggregator) ProcessMetricsFromAgent(ctx context.Context, agentID strin Timestamp: timestamp, AgentID: agentID, Description: metric.Description, + Type: protoMetricTypeToString(metric.Type), } aggregatedMetrics = append(aggregatedMetrics, aggregatedMetric) diff --git a/go.mod b/go.mod index b104d51c8..4a7615414 100644 --- a/go.mod +++ b/go.mod @@ -189,8 +189,8 @@ require ( github.com/pelletier/go-toml/v2 v2.3.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect - github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.67.5 // indirect + github.com/prometheus/client_model v0.6.2 + github.com/prometheus/common v0.67.5 github.com/prometheus/procfs v0.20.1 // indirect github.com/robfig/cron/v3 v3.0.1 github.com/sagikazarmark/locafero v0.12.0 // indirect
