kezhenxu94 commented on a change in pull request #42: URL: https://github.com/apache/skywalking-satellite/pull/42#discussion_r645427895
########## File path: plugins/fetcher/prometheus/queue_appender.go ########## @@ -0,0 +1,117 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package prometheus + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" +) + +// QueueAppender todo appender with queue Review comment: Is this `todo` to be done in the future? ########## File path: plugins/fetcher/prometheus/queue_appender.go ########## @@ -0,0 +1,117 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package prometheus + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" +) + +// QueueAppender todo appender with queue +type QueueAppender struct { + Ctx context.Context + Ms *metadataService + isNew bool + job string + instance string + metricBuilder *metricBuilder + useStartTimeMetric bool + OutputChannel chan *v1.SniffData +} + +// NewQueueAppender construct QueueAppender +func NewQueueAppender(ctx context.Context, ms *metadataService, oc chan *v1.SniffData, useStartTimeMetric bool) *QueueAppender { + return &QueueAppender{Ctx: ctx, Ms: ms, OutputChannel: oc, isNew: true, useStartTimeMetric: useStartTimeMetric} +} + +func (qa *QueueAppender) initAppender(ls labels.Labels) error { + job, instance := ls.Get(model.JobLabel), ls.Get(model.InstanceLabel) + if job == "" || instance == "" { + // errNoJobInstance + return fmt.Errorf("errNoJobInstance") + } + // discover the binding target when this method is called for the first time during a transaction + mc, err := qa.Ms.Get(job, instance) + if err != nil { + return err + } + qa.job = job + qa.instance = instance + qa.metricBuilder = newMetricBuilder(mc, qa.useStartTimeMetric) + qa.isNew = false + return nil +} + +var _ storage.Appender = (*QueueAppender)(nil) + +// always returns 0 to disable label caching Review comment: Comments of public methods should start with the method name. The same for the other methods in this PR ########## File path: plugins/fetcher/prometheus/metric_family.go ########## @@ -0,0 +1,381 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package prometheus + +import ( + "fmt" + "math" + "sort" + "strconv" + "strings" + + "github.com/prometheus/common/model" + v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/scrape" +) + +type MetricFamily interface { + Add(metricName string, ls labels.Labels, t int64, v float64) error + IsSameFamily(metricName string) bool + // to OTLP metrics + // will return 1. metricspb.Metric with timeseries 2. counter all of timeseries 3. count dropped timeseries + ToMetric() []*v3.MeterData +} + +type metricFamily struct { + name string + mtype textparse.MetricType + mc MetadataCache + labelKeys map[string]bool + labelKeysOrdered []string + metadata *scrape.MetricMetadata + groupOrders map[string]int + groups map[string]*metricGroup +} + +type metricGroup struct { + family *metricFamily + name string + ts int64 + ls labels.Labels + hasCount bool + count float64 + hasSum bool + sum float64 + value float64 + complexValue []*dataPoint +} + +type dataPoint struct { + value float64 + boundary float64 +} + +func normalizeMetricName(name string) string { + for _, s := range trimmableSuffixes { + if strings.HasSuffix(name, s) && name != s { + return strings.TrimSuffix(name, s) + } + } + return name +} + +func newMetricFamily(metricName string, mc MetadataCache) MetricFamily { + familyName := normalizeMetricName(metricName) + // lookup metadata based on familyName + metadata, ok := mc.Metadata(familyName) + if !ok && metricName != familyName { + // use the original metricName as metricFamily + familyName = metricName + // perform a 2nd lookup with the original metric name. it can happen if there's a metric which is not histogram + // or summary, but ends with one of those _count/_sum suffixes + metadata, ok = mc.Metadata(metricName) + // still not found, this can happen when metric has no TYPE HINT + if !ok { + metadata.Metric = familyName + metadata.Type = textparse.MetricTypeUnknown + } + } + + return &metricFamily{ + name: familyName, + mtype: metadata.Type, + mc: mc, + labelKeys: make(map[string]bool), + labelKeysOrdered: make([]string, 0), + metadata: &metadata, + groupOrders: make(map[string]int), + groups: make(map[string]*metricGroup), + } +} + +func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v float64) error { + groupKey := mf.getGroupKey(ls) + mg := mf.loadMetricGroupOrCreate(groupKey, ls, t) + switch mf.mtype { + case textparse.MetricTypeCounter: + mg.value = v + case textparse.MetricTypeGauge: + mg.value = v + case textparse.MetricTypeHistogram: + if strings.HasSuffix(metricName, metricsSuffixCount) { + mg.hasCount = true + mg.count = v + mg.name = metricName + } else if strings.HasSuffix(metricName, metricsSuffixSum) { + mg.hasSum = true + mg.sum = v + mg.name = metricName + } else if strings.HasSuffix(metricName, metricsSuffixBucket) { + boundary, err := getBoundary(mf.mtype, ls) + if err != nil { + return err + } + mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary}) + } + mg.ts = t + case textparse.MetricTypeSummary: + if strings.HasSuffix(metricName, metricsSuffixCount) { + mg.hasCount = true + mg.count = v + mg.name = metricName + } else if strings.HasSuffix(metricName, metricsSuffixSum) { + mg.hasSum = true + mg.sum = v + mg.name = metricName + } else { + mg.value = v + mg.name = metricName + } + mg.ts = t + default: + mg.value = v + mg.name = metricName + } + return nil +} + +func getBoundary(metricType textparse.MetricType, lbs labels.Labels) (float64, error) { + labelName := "" + switch metricType { + case textparse.MetricTypeHistogram: + labelName = model.BucketLabel + case textparse.MetricTypeSummary: + labelName = model.QuantileLabel + default: + return 0, fmt.Errorf("errNoBoundaryLabel") + } + + v := lbs.Get(labelName) + if v == "" { + return 0, fmt.Errorf("errEmptyBoundaryLabel") + } + + return strconv.ParseFloat(v, 64) +} + +func (mf *metricFamily) toMeterSingleValue(mg *metricGroup) *v3.MeterSingleValue { + var result *v3.MeterSingleValue + if mg.hasCount { + result = &v3.MeterSingleValue{ + Name: mg.name, + Labels: mf.convertLabels(mg), + Value: mg.count, + } + } else if mg.hasSum { + result = &v3.MeterSingleValue{ + Name: mg.name, + Labels: mf.convertLabels(mg), + Value: mg.sum, + } + } else { + result = &v3.MeterSingleValue{ + Name: mg.name, + Labels: mf.convertLabels(mg), + Value: mg.value, + } + } + return result +} + +func (mf *metricFamily) ToMetric() []*v3.MeterData { + result := make([]*v3.MeterData, 0) + switch mf.mtype { + case textparse.MetricTypeSummary: + for _, mg := range mf.getGroups() { + msv := mf.toMeterSingleValue(mg) + result = append(result, &v3.MeterData{ + Metric: &v3.MeterData_SingleValue{SingleValue: msv}, + Timestamp: mg.ts, + }) + } + case textparse.MetricTypeHistogram: + for _, mg := range mf.getGroups() { + if mg.hasCount { + msv := &v3.MeterSingleValue{ + Name: mg.name, + Labels: mf.convertLabels(mg), + Value: mg.count, + } + result = append(result, &v3.MeterData{ + Metric: &v3.MeterData_SingleValue{SingleValue: msv}, + Timestamp: mg.ts, + }) + continue + } + if mg.hasSum { + msv := &v3.MeterSingleValue{ + Name: mg.name, + Labels: mf.convertLabels(mg), + Value: mg.sum, + } + result = append(result, &v3.MeterData{ + Metric: &v3.MeterData_SingleValue{SingleValue: msv}, + Timestamp: mg.ts, + }) + continue + } + + bucketMap := make(map[float64]float64) + for _, dp := range mg.complexValue { + bucketMap[dp.boundary] = dp.value + } + sort.Slice(mg.complexValue, func(i, j int) bool { + return mg.complexValue[i].boundary < mg.complexValue[j].boundary + }) + mbs := make([]*v3.MeterBucketValue, 0) + for index, m := range mg.complexValue { + if index == 0 { + mbv := &v3.MeterBucketValue{ + Bucket: float64(math.MinInt64), + Count: int64(m.value), + } + mbs = append(mbs, mbv) + } else { + mbv := &v3.MeterBucketValue{ + Bucket: mg.complexValue[index-1].boundary, + Count: int64(m.value), + } + mbs = append(mbs, mbv) + } + } + mh := &v3.MeterHistogram{ + Name: mf.name, + Labels: mf.convertLabels(mg), + Values: mbs, + } + result = append(result, &v3.MeterData{ + Metric: &v3.MeterData_Histogram{ + Histogram: mh, + }, + Timestamp: mg.ts, + }) + } + default: + for _, mg := range mf.getGroups() { + msv := &v3.MeterSingleValue{ + Name: mf.name, + Labels: mf.convertLabels(mg), + Value: mg.value, + } + result = append(result, &v3.MeterData{ + Metric: &v3.MeterData_SingleValue{SingleValue: msv}, + // job, instance will be added in QueueAppender + Timestamp: mg.ts, + }) + } + } + return result +} + +func (mf *metricFamily) convertLabels(mg *metricGroup) []*v3.Label { + result := make([]*v3.Label, 0) + for k, v := range mg.ls.Map() { + label := &v3.Label{ + Name: k, + Value: v, + } + result = append(result, label) + } + return result +} + +func (mf *metricFamily) getGroups() []*metricGroup { + groups := make([]*metricGroup, len(mf.groupOrders)) + for k, v := range mf.groupOrders { + groups[v] = mf.groups[k] + } + + return groups +} + +func (mf *metricFamily) IsSameFamily(metricName string) bool { + // trim known suffix if necessary + familyName := normalizeMetricName(metricName) + return mf.name == familyName || familyName != metricName && mf.name == metricName +} + +func (mf *metricFamily) getGroupKey(ls labels.Labels) string { + mf.updateLabelKeys(ls) + return dpgSignature(mf.labelKeysOrdered, ls) +} + +func dpgSignature(orderedKnownLabelKeys []string, ls labels.Labels) string { + sign := make([]string, 0, len(orderedKnownLabelKeys)) + for _, k := range orderedKnownLabelKeys { + v := ls.Get(k) + if v == "" { + continue + } + sign = append(sign, k+"="+v) + } + return fmt.Sprintf("%#v", sign) +} + +func (mf *metricFamily) updateLabelKeys(ls labels.Labels) { + for _, l := range ls { + if isUsefulLabel(mf.mtype, l.Name) { + if _, ok := mf.labelKeys[l.Name]; !ok { + mf.labelKeys[l.Name] = true + // use insertion sort to maintain order + i := sort.SearchStrings(mf.labelKeysOrdered, l.Name) + labelKeys := append(mf.labelKeysOrdered, "") + copy(labelKeys[i+1:], labelKeys[i:]) + labelKeys[i] = l.Name + mf.labelKeysOrdered = labelKeys + } + } + } +} + +// todo Review comment: Remove or finish the todo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
