kezhenxu94 commented on a change in pull request #42:
URL: 
https://github.com/apache/skywalking-satellite/pull/42#discussion_r645427895



##########
File path: plugins/fetcher/prometheus/queue_appender.go
##########
@@ -0,0 +1,117 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package prometheus
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "time"
+
+       "github.com/prometheus/common/model"
+       "github.com/prometheus/prometheus/pkg/labels"
+       "github.com/prometheus/prometheus/storage"
+       v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+// QueueAppender todo appender with queue

Review comment:
       Is this `todo` to be done in the future?

##########
File path: plugins/fetcher/prometheus/queue_appender.go
##########
@@ -0,0 +1,117 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package prometheus
+
+import (
+       "context"
+       "fmt"
+       "math"
+       "time"
+
+       "github.com/prometheus/common/model"
+       "github.com/prometheus/prometheus/pkg/labels"
+       "github.com/prometheus/prometheus/storage"
+       v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+// QueueAppender todo appender with queue
+type QueueAppender struct {
+       Ctx                context.Context
+       Ms                 *metadataService
+       isNew              bool
+       job                string
+       instance           string
+       metricBuilder      *metricBuilder
+       useStartTimeMetric bool
+       OutputChannel      chan *v1.SniffData
+}
+
+// NewQueueAppender construct QueueAppender
+func NewQueueAppender(ctx context.Context, ms *metadataService, oc chan 
*v1.SniffData, useStartTimeMetric bool) *QueueAppender {
+       return &QueueAppender{Ctx: ctx, Ms: ms, OutputChannel: oc, isNew: true, 
useStartTimeMetric: useStartTimeMetric}
+}
+
+func (qa *QueueAppender) initAppender(ls labels.Labels) error {
+       job, instance := ls.Get(model.JobLabel), ls.Get(model.InstanceLabel)
+       if job == "" || instance == "" {
+               // errNoJobInstance
+               return fmt.Errorf("errNoJobInstance")
+       }
+       // discover the binding target when this method is called for the first 
time during a transaction
+       mc, err := qa.Ms.Get(job, instance)
+       if err != nil {
+               return err
+       }
+       qa.job = job
+       qa.instance = instance
+       qa.metricBuilder = newMetricBuilder(mc, qa.useStartTimeMetric)
+       qa.isNew = false
+       return nil
+}
+
+var _ storage.Appender = (*QueueAppender)(nil)
+
+// always returns 0 to disable label caching

Review comment:
       Comments of public methods should start with the method name.
   The same for the other methods in this PR

##########
File path: plugins/fetcher/prometheus/metric_family.go
##########
@@ -0,0 +1,381 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package prometheus
+
+import (
+       "fmt"
+       "math"
+       "sort"
+       "strconv"
+       "strings"
+
+       "github.com/prometheus/common/model"
+       v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+       "github.com/prometheus/prometheus/pkg/labels"
+       "github.com/prometheus/prometheus/pkg/textparse"
+       "github.com/prometheus/prometheus/scrape"
+)
+
+type MetricFamily interface {
+       Add(metricName string, ls labels.Labels, t int64, v float64) error
+       IsSameFamily(metricName string) bool
+       // to OTLP metrics
+       // will return 1. metricspb.Metric with timeseries 2. counter all of 
timeseries 3. count dropped timeseries
+       ToMetric() []*v3.MeterData
+}
+
+type metricFamily struct {
+       name             string
+       mtype            textparse.MetricType
+       mc               MetadataCache
+       labelKeys        map[string]bool
+       labelKeysOrdered []string
+       metadata         *scrape.MetricMetadata
+       groupOrders      map[string]int
+       groups           map[string]*metricGroup
+}
+
+type metricGroup struct {
+       family       *metricFamily
+       name         string
+       ts           int64
+       ls           labels.Labels
+       hasCount     bool
+       count        float64
+       hasSum       bool
+       sum          float64
+       value        float64
+       complexValue []*dataPoint
+}
+
+type dataPoint struct {
+       value    float64
+       boundary float64
+}
+
+func normalizeMetricName(name string) string {
+       for _, s := range trimmableSuffixes {
+               if strings.HasSuffix(name, s) && name != s {
+                       return strings.TrimSuffix(name, s)
+               }
+       }
+       return name
+}
+
+func newMetricFamily(metricName string, mc MetadataCache) MetricFamily {
+       familyName := normalizeMetricName(metricName)
+       // lookup metadata based on familyName
+       metadata, ok := mc.Metadata(familyName)
+       if !ok && metricName != familyName {
+               // use the original metricName as metricFamily
+               familyName = metricName
+               // perform a 2nd lookup with the original metric name. it can 
happen if there's a metric which is not histogram
+               // or summary, but ends with one of those _count/_sum suffixes
+               metadata, ok = mc.Metadata(metricName)
+               // still not found, this can happen when metric has no TYPE HINT
+               if !ok {
+                       metadata.Metric = familyName
+                       metadata.Type = textparse.MetricTypeUnknown
+               }
+       }
+
+       return &metricFamily{
+               name:             familyName,
+               mtype:            metadata.Type,
+               mc:               mc,
+               labelKeys:        make(map[string]bool),
+               labelKeysOrdered: make([]string, 0),
+               metadata:         &metadata,
+               groupOrders:      make(map[string]int),
+               groups:           make(map[string]*metricGroup),
+       }
+}
+
+func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v 
float64) error {
+       groupKey := mf.getGroupKey(ls)
+       mg := mf.loadMetricGroupOrCreate(groupKey, ls, t)
+       switch mf.mtype {
+       case textparse.MetricTypeCounter:
+               mg.value = v
+       case textparse.MetricTypeGauge:
+               mg.value = v
+       case textparse.MetricTypeHistogram:
+               if strings.HasSuffix(metricName, metricsSuffixCount) {
+                       mg.hasCount = true
+                       mg.count = v
+                       mg.name = metricName
+               } else if strings.HasSuffix(metricName, metricsSuffixSum) {
+                       mg.hasSum = true
+                       mg.sum = v
+                       mg.name = metricName
+               } else if strings.HasSuffix(metricName, metricsSuffixBucket) {
+                       boundary, err := getBoundary(mf.mtype, ls)
+                       if err != nil {
+                               return err
+                       }
+                       mg.complexValue = append(mg.complexValue, 
&dataPoint{value: v, boundary: boundary})
+               }
+               mg.ts = t
+       case textparse.MetricTypeSummary:
+               if strings.HasSuffix(metricName, metricsSuffixCount) {
+                       mg.hasCount = true
+                       mg.count = v
+                       mg.name = metricName
+               } else if strings.HasSuffix(metricName, metricsSuffixSum) {
+                       mg.hasSum = true
+                       mg.sum = v
+                       mg.name = metricName
+               } else {
+                       mg.value = v
+                       mg.name = metricName
+               }
+               mg.ts = t
+       default:
+               mg.value = v
+               mg.name = metricName
+       }
+       return nil
+}
+
+func getBoundary(metricType textparse.MetricType, lbs labels.Labels) (float64, 
error) {
+       labelName := ""
+       switch metricType {
+       case textparse.MetricTypeHistogram:
+               labelName = model.BucketLabel
+       case textparse.MetricTypeSummary:
+               labelName = model.QuantileLabel
+       default:
+               return 0, fmt.Errorf("errNoBoundaryLabel")
+       }
+
+       v := lbs.Get(labelName)
+       if v == "" {
+               return 0, fmt.Errorf("errEmptyBoundaryLabel")
+       }
+
+       return strconv.ParseFloat(v, 64)
+}
+
+func (mf *metricFamily) toMeterSingleValue(mg *metricGroup) 
*v3.MeterSingleValue {
+       var result *v3.MeterSingleValue
+       if mg.hasCount {
+               result = &v3.MeterSingleValue{
+                       Name:   mg.name,
+                       Labels: mf.convertLabels(mg),
+                       Value:  mg.count,
+               }
+       } else if mg.hasSum {
+               result = &v3.MeterSingleValue{
+                       Name:   mg.name,
+                       Labels: mf.convertLabels(mg),
+                       Value:  mg.sum,
+               }
+       } else {
+               result = &v3.MeterSingleValue{
+                       Name:   mg.name,
+                       Labels: mf.convertLabels(mg),
+                       Value:  mg.value,
+               }
+       }
+       return result
+}
+
+func (mf *metricFamily) ToMetric() []*v3.MeterData {
+       result := make([]*v3.MeterData, 0)
+       switch mf.mtype {
+       case textparse.MetricTypeSummary:
+               for _, mg := range mf.getGroups() {
+                       msv := mf.toMeterSingleValue(mg)
+                       result = append(result, &v3.MeterData{
+                               Metric:    
&v3.MeterData_SingleValue{SingleValue: msv},
+                               Timestamp: mg.ts,
+                       })
+               }
+       case textparse.MetricTypeHistogram:
+               for _, mg := range mf.getGroups() {
+                       if mg.hasCount {
+                               msv := &v3.MeterSingleValue{
+                                       Name:   mg.name,
+                                       Labels: mf.convertLabels(mg),
+                                       Value:  mg.count,
+                               }
+                               result = append(result, &v3.MeterData{
+                                       Metric:    
&v3.MeterData_SingleValue{SingleValue: msv},
+                                       Timestamp: mg.ts,
+                               })
+                               continue
+                       }
+                       if mg.hasSum {
+                               msv := &v3.MeterSingleValue{
+                                       Name:   mg.name,
+                                       Labels: mf.convertLabels(mg),
+                                       Value:  mg.sum,
+                               }
+                               result = append(result, &v3.MeterData{
+                                       Metric:    
&v3.MeterData_SingleValue{SingleValue: msv},
+                                       Timestamp: mg.ts,
+                               })
+                               continue
+                       }
+
+                       bucketMap := make(map[float64]float64)
+                       for _, dp := range mg.complexValue {
+                               bucketMap[dp.boundary] = dp.value
+                       }
+                       sort.Slice(mg.complexValue, func(i, j int) bool {
+                               return mg.complexValue[i].boundary < 
mg.complexValue[j].boundary
+                       })
+                       mbs := make([]*v3.MeterBucketValue, 0)
+                       for index, m := range mg.complexValue {
+                               if index == 0 {
+                                       mbv := &v3.MeterBucketValue{
+                                               Bucket: float64(math.MinInt64),
+                                               Count:  int64(m.value),
+                                       }
+                                       mbs = append(mbs, mbv)
+                               } else {
+                                       mbv := &v3.MeterBucketValue{
+                                               Bucket: 
mg.complexValue[index-1].boundary,
+                                               Count:  int64(m.value),
+                                       }
+                                       mbs = append(mbs, mbv)
+                               }
+                       }
+                       mh := &v3.MeterHistogram{
+                               Name:   mf.name,
+                               Labels: mf.convertLabels(mg),
+                               Values: mbs,
+                       }
+                       result = append(result, &v3.MeterData{
+                               Metric: &v3.MeterData_Histogram{
+                                       Histogram: mh,
+                               },
+                               Timestamp: mg.ts,
+                       })
+               }
+       default:
+               for _, mg := range mf.getGroups() {
+                       msv := &v3.MeterSingleValue{
+                               Name:   mf.name,
+                               Labels: mf.convertLabels(mg),
+                               Value:  mg.value,
+                       }
+                       result = append(result, &v3.MeterData{
+                               Metric: &v3.MeterData_SingleValue{SingleValue: 
msv},
+                               // job, instance will be added in QueueAppender
+                               Timestamp: mg.ts,
+                       })
+               }
+       }
+       return result
+}
+
+func (mf *metricFamily) convertLabels(mg *metricGroup) []*v3.Label {
+       result := make([]*v3.Label, 0)
+       for k, v := range mg.ls.Map() {
+               label := &v3.Label{
+                       Name:  k,
+                       Value: v,
+               }
+               result = append(result, label)
+       }
+       return result
+}
+
+func (mf *metricFamily) getGroups() []*metricGroup {
+       groups := make([]*metricGroup, len(mf.groupOrders))
+       for k, v := range mf.groupOrders {
+               groups[v] = mf.groups[k]
+       }
+
+       return groups
+}
+
+func (mf *metricFamily) IsSameFamily(metricName string) bool {
+       // trim known suffix if necessary
+       familyName := normalizeMetricName(metricName)
+       return mf.name == familyName || familyName != metricName && mf.name == 
metricName
+}
+
+func (mf *metricFamily) getGroupKey(ls labels.Labels) string {
+       mf.updateLabelKeys(ls)
+       return dpgSignature(mf.labelKeysOrdered, ls)
+}
+
+func dpgSignature(orderedKnownLabelKeys []string, ls labels.Labels) string {
+       sign := make([]string, 0, len(orderedKnownLabelKeys))
+       for _, k := range orderedKnownLabelKeys {
+               v := ls.Get(k)
+               if v == "" {
+                       continue
+               }
+               sign = append(sign, k+"="+v)
+       }
+       return fmt.Sprintf("%#v", sign)
+}
+
+func (mf *metricFamily) updateLabelKeys(ls labels.Labels) {
+       for _, l := range ls {
+               if isUsefulLabel(mf.mtype, l.Name) {
+                       if _, ok := mf.labelKeys[l.Name]; !ok {
+                               mf.labelKeys[l.Name] = true
+                               // use insertion sort to maintain order
+                               i := sort.SearchStrings(mf.labelKeysOrdered, 
l.Name)
+                               labelKeys := append(mf.labelKeysOrdered, "")
+                               copy(labelKeys[i+1:], labelKeys[i:])
+                               labelKeys[i] = l.Name
+                               mf.labelKeysOrdered = labelKeys
+                       }
+               }
+       }
+}
+
+// todo

Review comment:
       Remove or finish the todo




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to